commit 247fad46c7c38049c7a41c44e4c789885402c19c
parent 6fa5f2e5a971d970311bfd994513736593ebfd3e
Author: Steve Gattuso <steve@stevegattuso.me>
Date: Mon, 13 Nov 2023 17:02:03 +0100
improve reliability of rollup
Diffstat:
1 file changed, 49 insertions(+), 10 deletions(-)
diff --git a/bin/hourly-volume-rollup b/bin/hourly-volume-rollup
@@ -7,6 +7,8 @@ import datetime
import pandas as pd
import geopandas as gpd
import pathlib
+import os
+import multiprocessing
import forerad.scrapers.historical as historical
import forerad.persistence as persistence
@@ -35,9 +37,11 @@ def detect_status(year: int, month: int) -> tuple[set[pd.Timestamp], set[pd.Time
expected = set(hours_in_month.tz_convert(utils.TZ_UTC))
rollup = store.fetch_hourly_volume_rollup(first_day, next_month)
+ rollup.index = rollup.index.tz_convert(utils.TZ_UTC)
actual = set(rollup.index)
missing = expected - actual
+
return expected, actual, missing
@@ -45,8 +49,8 @@ def is_complete(archive: historical.HistoricalTripArchive) -> bool:
_, _, missing = detect_status(archive.date.year, archive.date.month)
return len(missing) == 0
-def derive_rollup(a: historical.HistoricalTripArchive):
- df = a.fetch_df()
+def derive_rollup(archive: historical.HistoricalTripArchive):
+ df = archive.fetch_df()
boroughs = gpd.read_file(REPO_ROOT / 'resources' / 'borough-boundaries.geojson')
df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.start_lng, df.start_lat))\
@@ -74,16 +78,31 @@ def derive_rollup(a: historical.HistoricalTripArchive):
if key not in df.columns:
df[key] = 0
- df = df.reset_index()
+ # Join with a generated range of timestamps to ensure there are no gaps in
+ # the resulting rollup
+ hours_in_month = pd.DataFrame(
+ index=pd.date_range(
+ start=df.index.values.min(),
+ end=df.index.values.max(),
+ freq="1H",
+ tz=utils.TZ_UTC,
+ ),
+ )
+ hours_in_month.index.name = 'started_at'
+ df = hours_in_month\
+ .join(df)\
+ .fillna(0)\
+ .reset_index()
+
df['local_date'] = df['started_at']\
.dt.tz_convert('America/New_York')\
.dt.strftime('%Y-%m-%d')
- store.write_hourly_volume_rollup(df)
- utils.logger.info(f"Wrote {len(df)} members to table for {a.month_str}")
+ return [archive, df]
-def main__populate(month_str):
+def main__populate(month_str: str, cores: int):
archives = historical.HistoricalTripArchive.list_cached()
+
if month_str is not None:
year, month = utils.parse_month_str(month_str)
archives = [a for a in archives if a.date.year == year and a.date.month == month]
@@ -93,7 +112,15 @@ def main__populate(month_str):
archives = [a for a in archives if not is_complete(a)]
utils.logger.info(f'Rolling up {len(archives)} months of data')
- [derive_rollup(a) for a in archives]
+ with multiprocessing.Pool(cores) as pool:
+ if cores == 1:
+ iterator = [derive_rollup(a) for a in archives]
+ else:
+ iterator = pool.imap(derive_rollup, archives)
+
+ for archive, df in iterator:
+ store.write_hourly_volume_rollup(df)
+ utils.logger.info(f"Wrote {len(df)} members to table for {archive.month_str}")
def main__status():
@@ -102,8 +129,14 @@ def main__status():
print('Archive status:')
for archive in available_archives:
expected, actual, missing = detect_status(archive.date.year, archive.date.month)
- presence = '[x]' if len(missing) == 0 else '[ ]'
- print(f'\t{presence} {archive.date.year}-{archive.date.month:02d} ({len(actual)}/{len(expected)})')
+ if len(missing) == 0:
+ presence = '[x]'
+ elif len(actual) > len(expected):
+ presence = '[!]'
+ else:
+ presence = '[ ]'
+
+ print(f'\t{presence} {archive.month_str} ({len(actual)}/{len(expected)})')
if __name__ == '__main__':
@@ -112,10 +145,16 @@ if __name__ == '__main__':
)
parser.add_argument('action', choices=['status', 'populate'])
parser.add_argument('--month', help="The month to populate, in YYYY-MM format")
+ parser.add_argument(
+ '--cores',
+ type=int,
+ default=1,
+ help="how many cores to process with (be careful with memory requirements!)"
+ )
args = parser.parse_args()
if args.action == 'populate':
- main__populate(args.month)
+ main__populate(args.month, args.cores)
elif args.action == 'status':
main__status()