forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

hourly-volume-rollup (5091B)


      1 #!/usr/bin/env python3
      2 """
      3 This script handles the population of the daily_volume_rollup table
      4 """
      5 import argparse
      6 import datetime
      7 import pandas as pd
      8 import geopandas as gpd
      9 import pathlib
     10 import os
     11 import multiprocessing
     12 
     13 import forerad.scrapers.historical as historical
     14 import forerad.persistence as persistence
     15 import forerad.utils as utils
     16 
     17 store = persistence.SQLiteStore()
     18 
     19 # The day that Citibike began publishing data on
     20 ORIGIN_DATE = datetime.date(2013, 6, 1)
     21 REPO_ROOT = pathlib.Path(__file__).parent.parent
     22 
     23 def detect_status(year: int, month: int) -> tuple[set[pd.Timestamp], set[pd.Timestamp], set[pd.Timestamp]]:
     24     """
     25     Given a month, look at the hourly rollup and generate a set of missing
     26     members
     27     """
     28     first_day = datetime.date(year, month, 1)
     29     next_month = utils.next_month(first_day.year, first_day.month)
     30     hours_in_month = pd.date_range(
     31         start=first_day,
     32         end=next_month,
     33         freq="1H",
     34         tz=utils.TZ_NYC,
     35         inclusive="left"
     36     )
     37     expected = set(hours_in_month.tz_convert(utils.TZ_UTC))
     38 
     39     rollup = store.fetch_hourly_volume_rollup(first_day, next_month)
     40     rollup.index = rollup.index.tz_convert(utils.TZ_UTC)
     41     actual = set(rollup.index)
     42 
     43     missing = expected - actual
     44 
     45     return expected, actual, missing
     46 
     47 
     48 def is_complete(archive: historical.HistoricalTripArchive) -> bool:
     49     _, _, missing = detect_status(archive.date.year, archive.date.month)
     50     return len(missing) == 0
     51 
     52 def derive_rollup(archive: historical.HistoricalTripArchive):
     53     df = archive.fetch_df()
     54 
     55     boroughs = gpd.read_file(REPO_ROOT / 'resources' / 'borough-boundaries.geojson')
     56     df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.start_lng, df.start_lat))\
     57         .set_crs('EPSG:4326')
     58     df = df.sjoin(boroughs, predicate='within')
     59 
     60     df['trip_count'] = 1
     61     grouper = pd.Grouper(key='started_at', freq='1H')
     62     df = df.groupby([grouper, 'boro_name']).count().reset_index()
     63     df = df[['started_at', 'boro_name', 'trip_count']]
     64     df = df.pivot_table(
     65         columns=['boro_name'],
     66         index=['started_at'],
     67         fill_value=0,
     68     )
     69     df.columns = [f'{boro.lower()}_trips' for _, boro in df.columns]
     70 
     71     boros = [
     72         'manhattan', 'bronx', 'queens', 'brooklyn', 'staten_island'
     73     ]
     74     for boro in boros:
     75         # Pour one out for the one borough that this if statement will always
     76         # hit.
     77         key = f'{boro}_trips'
     78         if key not in df.columns:
     79             df[key] = 0
     80 
     81     # Join with a generated range of timestamps to ensure there are no gaps in
     82     # the resulting rollup
     83     hours_in_month = pd.DataFrame(
     84         index=pd.date_range(
     85             start=df.index.values.min(),
     86             end=df.index.values.max(),
     87             freq="1H",
     88             tz=utils.TZ_UTC,
     89         ),
     90     )
     91     hours_in_month.index.name = 'started_at'
     92     df = hours_in_month\
     93         .join(df)\
     94         .fillna(0)\
     95         .reset_index()
     96 
     97     df['local_date'] = df['started_at']\
     98         .dt.tz_convert('America/New_York')\
     99         .dt.strftime('%Y-%m-%d')
    100 
    101     return [archive, df]
    102 
    103 def main__populate(month_str: str, cores: int):
    104     archives = historical.HistoricalTripArchive.list_cached()
    105 
    106     if month_str is not None:
    107         year, month = utils.parse_month_str(month_str)
    108         archives = [a for a in archives if a.date.year == year and a.date.month == month]
    109 
    110     if not month_str:
    111         # Filter out completed rollups
    112         archives = [a for a in archives if not is_complete(a)]
    113 
    114     utils.logger.info(f'Rolling up {len(archives)} months of data')
    115     with multiprocessing.Pool(cores) as pool:
    116         if cores == 1:
    117             iterator = [derive_rollup(a) for a in archives]
    118         else:
    119             iterator = pool.imap(derive_rollup, archives)
    120 
    121         for archive, df in iterator:
    122             store.write_hourly_volume_rollup(df)
    123             utils.logger.info(f"Wrote {len(df)} members to table for {archive.month_str}")
    124 
    125 
    126 def main__status():
    127     available_archives = historical.HistoricalTripArchive.list_cached()
    128 
    129     print('Archive status:')
    130     for archive in available_archives:
    131         expected, actual, missing = detect_status(archive.date.year, archive.date.month)
    132         if len(missing) == 0:
    133             presence = '[x]'
    134         elif len(actual) > len(expected):
    135             presence = '[!]'
    136         else:
    137             presence = '[ ]'
    138 
    139         print(f'\t{presence} {archive.month_str} ({len(actual)}/{len(expected)})')
    140 
    141 
    142 if __name__ == '__main__':
    143     parser = argparse.ArgumentParser(
    144         description="Backfill historical trip data into your data store",
    145     )
    146     parser.add_argument('action', choices=['status', 'populate'])
    147     parser.add_argument('--month', help="The month to populate, in YYYY-MM format")
    148     parser.add_argument(
    149         '--cores',
    150         type=int,
    151         default=1,
    152         help="how many cores to process with (be careful with memory requirements!)"
    153     )
    154 
    155     args = parser.parse_args()
    156 
    157     if args.action == 'populate':
    158         main__populate(args.month, args.cores)
    159     elif args.action == 'status':
    160         main__status()