forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

historical.py (7061B)


      1 import re
      2 import datetime
      3 import zipfile
      4 import io
      5 import boto3
      6 import pathlib
      7 import pandas as pd
      8 import numpy as np
      9 import botocore.client as bclient
     10 from botocore import UNSIGNED
     11 
     12 import forerad.utils as utils
     13 
     14 # Notice the `(i)?` - they spelled Citibike wrong in one of the archives... lol
     15 ARCHIVE_REGEX = re.compile("^([0-9]{4})([0-9]{2})-cit(i)?bike-tripdata((.zip$)|(.csv.zip$))")
     16 CACHE_DIR = pathlib.Path(__file__).parent.parent.parent / pathlib.Path('.forerad-cache')
     17 TRIP_BUCKET = 'tripdata'
     18 if not CACHE_DIR.exists():
     19     CACHE_DIR.mkdir()
     20     utils.logger.debug('Initializing .cache dir')
     21 
     22 
     23 class HistoricalTripArchive():
     24     object_key: str
     25     date: datetime.date
     26 
     27     def __init__(self, object_key, date):
     28         self.object_key = object_key
     29         self.date = date
     30 
     31     @classmethod
     32     def __get_s3_client(cls):
     33         config = bclient.Config(signature_version=UNSIGNED)
     34         return boto3.client('s3', config=config)
     35 
     36     @classmethod
     37     def from_s3(cls, obj: dict):
     38         """
     39         Converts an S3 object dictionary into a HistoricalTripArchive record, returning
     40         None if the object doesn't look like a monthly ride archive.
     41         """
     42         match = ARCHIVE_REGEX.match(obj['Key'])
     43         if match is None:
     44             utils.logger.error(f"Skipping object {obj['Key']}")
     45             return None
     46 
     47         groups = match.groups()
     48 
     49         return cls(
     50             object_key=obj['Key'],
     51             date=datetime.date(int(groups[0]), int(groups[1]), 1)
     52         )
     53 
     54     @classmethod
     55     def from_archive_path(cls, path: pathlib.Path):
     56         match = ARCHIVE_REGEX.match(path.name)
     57         if match is None:
     58             return None
     59 
     60         groups = match.groups()
     61 
     62         return cls(
     63             object_key=path.name,
     64             date=datetime.date(int(groups[0]), int(groups[1]), 1)
     65         )
     66 
     67     @classmethod
     68     def list_remote(cls):
     69         """
     70         Returns a list of HistoricalTripArchive objects that are available on
     71         S3 but may or may not be cached locally.
     72         """
     73         s3 = cls.__get_s3_client()
     74         resp = s3.list_objects_v2(Bucket=TRIP_BUCKET)
     75 
     76         archives = [HistoricalTripArchive.from_s3(o) for o in resp['Contents']]
     77         return [a for a in archives if a is not None]
     78 
     79     @classmethod
     80     def list_cached(cls):
     81         """
     82         Returns a list of HistoricalTripArchive objects for all archives that have already
     83         been downloaded
     84         """
     85         archives = [cls.from_archive_path(p) for p in CACHE_DIR.glob('*')]
     86         archives = [a for a in archives if a is not None]
     87         return sorted(archives, key=lambda a: a.date)
     88     
     89     def __fetch_cached_blob(self) -> io.BytesIO | None:
     90         archive_path = CACHE_DIR / self.object_key
     91         if not archive_path.exists():
     92             return None
     93 
     94         with open(archive_path, 'rb') as f:
     95             return io.BytesIO(f.read())
     96 
     97     def __store_blob(self, blob: io.BytesIO):
     98         archive_path = CACHE_DIR / self.object_key
     99         with open(archive_path, 'wb') as f:
    100             utils.logger.info(f"Storing {self.object_key} in cache")
    101             f.write(blob.getbuffer())
    102 
    103         blob.seek(0)
    104 
    105     def fetch_df(self, normalize=True) -> pd.DataFrame:
    106         """
    107         Fetches a DataFrame of the archive. If the archive doesn't exist on the
    108         filesystem it will attempt to download it from S3.
    109         """
    110         blob = self.__fetch_cached_blob()
    111         if blob is None:
    112             utils.logger.info(f"Fetching {self.csv_name} from S3")
    113             s3 = self.__get_s3_client()
    114             resp = s3.get_object(Bucket=TRIP_BUCKET, Key=self.object_key)
    115             blob = io.BytesIO(resp['Body'].read())
    116             self.__store_blob(blob)
    117 
    118         # Unzip the CSV into a dataframe
    119         with zipfile.ZipFile(blob, 'r') as zip_archive:
    120             file_list = zip_archive.namelist()
    121             csv_name = self.csv_name
    122 
    123             if csv_name not in file_list and len(file_list) != 1:
    124                 utils.logger.error(f"Could not extract {self.csv_name}:")
    125                 utils.logger.error(file_list)
    126                 raise Exception("Could not extract {self.csv_name}")
    127 
    128             if csv_name not in file_list:
    129                 csv_name = file_list[0]
    130 
    131             with zip_archive.open(csv_name, 'r') as csv:
    132                 df = pd.read_csv(csv)
    133 
    134         if not normalize:
    135             return df
    136 
    137         # Normalize the dataframe
    138         if 'starttime' in df.columns:
    139             # Transform v1 schema
    140             df = df.rename(columns={
    141                 'starttime': 'started_at',
    142                 'stoptime': 'ended_at',
    143                 'start station id': 'start_station_id',
    144                 'end station id': 'end_station_id',
    145                 'start station latitude': 'start_lat',
    146                 'start station longitude': 'start_lng',
    147                 'end station latitude': 'end_lat',
    148                 'end station longitude': 'end_lng',
    149             })
    150         elif 'Start Time' in df.columns:
    151             # This is a weird in-between state of v1 and v2 that starts in the
    152             # 2016-10 dataset
    153             df = df.rename(columns={
    154                 'Start Time': 'started_at',
    155                 'Stop Time': 'ended_at',
    156                 'Start Station ID': 'start_station_id',
    157                 'End Station ID': 'end_station_id',
    158                 'Start Station Latitude': 'start_lat',
    159                 'Start Station Longitude': 'start_lng',
    160                 'End Station Latitude': 'end_lat',
    161                 'End Station Longitude': 'end_lng',
    162             })
    163 
    164         if 'rideable_type' not in df.columns:
    165             df['rideable_type'] = None
    166 
    167         df = df[[
    168             'rideable_type', 'started_at', 'ended_at', 'start_station_id',
    169             'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng',
    170         ]]
    171         df = df.replace({np.nan: None})
    172 
    173         # Timestamps in the archives are in NYC time but don't differentiate
    174         # between EST and EDT. I'm admittedly punting here by assigning
    175         # ambiguous times to NaT and then dropna'ing them below. At some point
    176         # this will likely need to be revisited.
    177         df['started_at'] = pd.to_datetime(df['started_at'])\
    178             .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\
    179             .dt.tz_convert(utils.TZ_UTC)
    180         df['ended_at'] = pd.to_datetime(df['ended_at'])\
    181             .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\
    182             .dt.tz_convert(utils.TZ_UTC)
    183 
    184         df.index.name = 'tempid'
    185 
    186         return df.dropna(subset=['started_at', 'ended_at'])
    187 
    188     @property
    189     def is_downloaded(self) -> bool:
    190         return (CACHE_DIR / self.object_key).exists()
    191 
    192     @property
    193     def month_str(self) -> str:
    194         return f'{self.date.year}-{self.date.month:02d}'
    195 
    196     @property
    197     def csv_name(self) -> str:
    198         if '.csv.zip' in self.object_key:
    199             return self.object_key.replace('.csv.zip', '.csv')
    200 
    201         return self.object_key.replace('.zip', '.csv')
    202 
    203     def __repr__(self):
    204         return f'<HistoricalTripArchive {self.date} {self.object_key} />'