hourly-volume-rollup (5091B)
1 #!/usr/bin/env python3 2 """ 3 This script handles the population of the daily_volume_rollup table 4 """ 5 import argparse 6 import datetime 7 import pandas as pd 8 import geopandas as gpd 9 import pathlib 10 import os 11 import multiprocessing 12 13 import forerad.scrapers.historical as historical 14 import forerad.persistence as persistence 15 import forerad.utils as utils 16 17 store = persistence.SQLiteStore() 18 19 # The day that Citibike began publishing data on 20 ORIGIN_DATE = datetime.date(2013, 6, 1) 21 REPO_ROOT = pathlib.Path(__file__).parent.parent 22 23 def detect_status(year: int, month: int) -> tuple[set[pd.Timestamp], set[pd.Timestamp], set[pd.Timestamp]]: 24 """ 25 Given a month, look at the hourly rollup and generate a set of missing 26 members 27 """ 28 first_day = datetime.date(year, month, 1) 29 next_month = utils.next_month(first_day.year, first_day.month) 30 hours_in_month = pd.date_range( 31 start=first_day, 32 end=next_month, 33 freq="1H", 34 tz=utils.TZ_NYC, 35 inclusive="left" 36 ) 37 expected = set(hours_in_month.tz_convert(utils.TZ_UTC)) 38 39 rollup = store.fetch_hourly_volume_rollup(first_day, next_month) 40 rollup.index = rollup.index.tz_convert(utils.TZ_UTC) 41 actual = set(rollup.index) 42 43 missing = expected - actual 44 45 return expected, actual, missing 46 47 48 def is_complete(archive: historical.HistoricalTripArchive) -> bool: 49 _, _, missing = detect_status(archive.date.year, archive.date.month) 50 return len(missing) == 0 51 52 def derive_rollup(archive: historical.HistoricalTripArchive): 53 df = archive.fetch_df() 54 55 boroughs = gpd.read_file(REPO_ROOT / 'resources' / 'borough-boundaries.geojson') 56 df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.start_lng, df.start_lat))\ 57 .set_crs('EPSG:4326') 58 df = df.sjoin(boroughs, predicate='within') 59 60 df['trip_count'] = 1 61 grouper = pd.Grouper(key='started_at', freq='1H') 62 df = df.groupby([grouper, 'boro_name']).count().reset_index() 63 df = df[['started_at', 'boro_name', 'trip_count']] 64 df = df.pivot_table( 65 columns=['boro_name'], 66 index=['started_at'], 67 fill_value=0, 68 ) 69 df.columns = [f'{boro.lower()}_trips' for _, boro in df.columns] 70 71 boros = [ 72 'manhattan', 'bronx', 'queens', 'brooklyn', 'staten_island' 73 ] 74 for boro in boros: 75 # Pour one out for the one borough that this if statement will always 76 # hit. 77 key = f'{boro}_trips' 78 if key not in df.columns: 79 df[key] = 0 80 81 # Join with a generated range of timestamps to ensure there are no gaps in 82 # the resulting rollup 83 hours_in_month = pd.DataFrame( 84 index=pd.date_range( 85 start=df.index.values.min(), 86 end=df.index.values.max(), 87 freq="1H", 88 tz=utils.TZ_UTC, 89 ), 90 ) 91 hours_in_month.index.name = 'started_at' 92 df = hours_in_month\ 93 .join(df)\ 94 .fillna(0)\ 95 .reset_index() 96 97 df['local_date'] = df['started_at']\ 98 .dt.tz_convert('America/New_York')\ 99 .dt.strftime('%Y-%m-%d') 100 101 return [archive, df] 102 103 def main__populate(month_str: str, cores: int): 104 archives = historical.HistoricalTripArchive.list_cached() 105 106 if month_str is not None: 107 year, month = utils.parse_month_str(month_str) 108 archives = [a for a in archives if a.date.year == year and a.date.month == month] 109 110 if not month_str: 111 # Filter out completed rollups 112 archives = [a for a in archives if not is_complete(a)] 113 114 utils.logger.info(f'Rolling up {len(archives)} months of data') 115 with multiprocessing.Pool(cores) as pool: 116 if cores == 1: 117 iterator = [derive_rollup(a) for a in archives] 118 else: 119 iterator = pool.imap(derive_rollup, archives) 120 121 for archive, df in iterator: 122 store.write_hourly_volume_rollup(df) 123 utils.logger.info(f"Wrote {len(df)} members to table for {archive.month_str}") 124 125 126 def main__status(): 127 available_archives = historical.HistoricalTripArchive.list_cached() 128 129 print('Archive status:') 130 for archive in available_archives: 131 expected, actual, missing = detect_status(archive.date.year, archive.date.month) 132 if len(missing) == 0: 133 presence = '[x]' 134 elif len(actual) > len(expected): 135 presence = '[!]' 136 else: 137 presence = '[ ]' 138 139 print(f'\t{presence} {archive.month_str} ({len(actual)}/{len(expected)})') 140 141 142 if __name__ == '__main__': 143 parser = argparse.ArgumentParser( 144 description="Backfill historical trip data into your data store", 145 ) 146 parser.add_argument('action', choices=['status', 'populate']) 147 parser.add_argument('--month', help="The month to populate, in YYYY-MM format") 148 parser.add_argument( 149 '--cores', 150 type=int, 151 default=1, 152 help="how many cores to process with (be careful with memory requirements!)" 153 ) 154 155 args = parser.parse_args() 156 157 if args.action == 'populate': 158 main__populate(args.month, args.cores) 159 elif args.action == 'status': 160 main__status()