historical.py (7061B)
1 import re 2 import datetime 3 import zipfile 4 import io 5 import boto3 6 import pathlib 7 import pandas as pd 8 import numpy as np 9 import botocore.client as bclient 10 from botocore import UNSIGNED 11 12 import forerad.utils as utils 13 14 # Notice the `(i)?` - they spelled Citibike wrong in one of the archives... lol 15 ARCHIVE_REGEX = re.compile("^([0-9]{4})([0-9]{2})-cit(i)?bike-tripdata((.zip$)|(.csv.zip$))") 16 CACHE_DIR = pathlib.Path(__file__).parent.parent.parent / pathlib.Path('.forerad-cache') 17 TRIP_BUCKET = 'tripdata' 18 if not CACHE_DIR.exists(): 19 CACHE_DIR.mkdir() 20 utils.logger.debug('Initializing .cache dir') 21 22 23 class HistoricalTripArchive(): 24 object_key: str 25 date: datetime.date 26 27 def __init__(self, object_key, date): 28 self.object_key = object_key 29 self.date = date 30 31 @classmethod 32 def __get_s3_client(cls): 33 config = bclient.Config(signature_version=UNSIGNED) 34 return boto3.client('s3', config=config) 35 36 @classmethod 37 def from_s3(cls, obj: dict): 38 """ 39 Converts an S3 object dictionary into a HistoricalTripArchive record, returning 40 None if the object doesn't look like a monthly ride archive. 41 """ 42 match = ARCHIVE_REGEX.match(obj['Key']) 43 if match is None: 44 utils.logger.error(f"Skipping object {obj['Key']}") 45 return None 46 47 groups = match.groups() 48 49 return cls( 50 object_key=obj['Key'], 51 date=datetime.date(int(groups[0]), int(groups[1]), 1) 52 ) 53 54 @classmethod 55 def from_archive_path(cls, path: pathlib.Path): 56 match = ARCHIVE_REGEX.match(path.name) 57 if match is None: 58 return None 59 60 groups = match.groups() 61 62 return cls( 63 object_key=path.name, 64 date=datetime.date(int(groups[0]), int(groups[1]), 1) 65 ) 66 67 @classmethod 68 def list_remote(cls): 69 """ 70 Returns a list of HistoricalTripArchive objects that are available on 71 S3 but may or may not be cached locally. 72 """ 73 s3 = cls.__get_s3_client() 74 resp = s3.list_objects_v2(Bucket=TRIP_BUCKET) 75 76 archives = [HistoricalTripArchive.from_s3(o) for o in resp['Contents']] 77 return [a for a in archives if a is not None] 78 79 @classmethod 80 def list_cached(cls): 81 """ 82 Returns a list of HistoricalTripArchive objects for all archives that have already 83 been downloaded 84 """ 85 archives = [cls.from_archive_path(p) for p in CACHE_DIR.glob('*')] 86 archives = [a for a in archives if a is not None] 87 return sorted(archives, key=lambda a: a.date) 88 89 def __fetch_cached_blob(self) -> io.BytesIO | None: 90 archive_path = CACHE_DIR / self.object_key 91 if not archive_path.exists(): 92 return None 93 94 with open(archive_path, 'rb') as f: 95 return io.BytesIO(f.read()) 96 97 def __store_blob(self, blob: io.BytesIO): 98 archive_path = CACHE_DIR / self.object_key 99 with open(archive_path, 'wb') as f: 100 utils.logger.info(f"Storing {self.object_key} in cache") 101 f.write(blob.getbuffer()) 102 103 blob.seek(0) 104 105 def fetch_df(self, normalize=True) -> pd.DataFrame: 106 """ 107 Fetches a DataFrame of the archive. If the archive doesn't exist on the 108 filesystem it will attempt to download it from S3. 109 """ 110 blob = self.__fetch_cached_blob() 111 if blob is None: 112 utils.logger.info(f"Fetching {self.csv_name} from S3") 113 s3 = self.__get_s3_client() 114 resp = s3.get_object(Bucket=TRIP_BUCKET, Key=self.object_key) 115 blob = io.BytesIO(resp['Body'].read()) 116 self.__store_blob(blob) 117 118 # Unzip the CSV into a dataframe 119 with zipfile.ZipFile(blob, 'r') as zip_archive: 120 file_list = zip_archive.namelist() 121 csv_name = self.csv_name 122 123 if csv_name not in file_list and len(file_list) != 1: 124 utils.logger.error(f"Could not extract {self.csv_name}:") 125 utils.logger.error(file_list) 126 raise Exception("Could not extract {self.csv_name}") 127 128 if csv_name not in file_list: 129 csv_name = file_list[0] 130 131 with zip_archive.open(csv_name, 'r') as csv: 132 df = pd.read_csv(csv) 133 134 if not normalize: 135 return df 136 137 # Normalize the dataframe 138 if 'starttime' in df.columns: 139 # Transform v1 schema 140 df = df.rename(columns={ 141 'starttime': 'started_at', 142 'stoptime': 'ended_at', 143 'start station id': 'start_station_id', 144 'end station id': 'end_station_id', 145 'start station latitude': 'start_lat', 146 'start station longitude': 'start_lng', 147 'end station latitude': 'end_lat', 148 'end station longitude': 'end_lng', 149 }) 150 elif 'Start Time' in df.columns: 151 # This is a weird in-between state of v1 and v2 that starts in the 152 # 2016-10 dataset 153 df = df.rename(columns={ 154 'Start Time': 'started_at', 155 'Stop Time': 'ended_at', 156 'Start Station ID': 'start_station_id', 157 'End Station ID': 'end_station_id', 158 'Start Station Latitude': 'start_lat', 159 'Start Station Longitude': 'start_lng', 160 'End Station Latitude': 'end_lat', 161 'End Station Longitude': 'end_lng', 162 }) 163 164 if 'rideable_type' not in df.columns: 165 df['rideable_type'] = None 166 167 df = df[[ 168 'rideable_type', 'started_at', 'ended_at', 'start_station_id', 169 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng', 170 ]] 171 df = df.replace({np.nan: None}) 172 173 # Timestamps in the archives are in NYC time but don't differentiate 174 # between EST and EDT. I'm admittedly punting here by assigning 175 # ambiguous times to NaT and then dropna'ing them below. At some point 176 # this will likely need to be revisited. 177 df['started_at'] = pd.to_datetime(df['started_at'])\ 178 .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\ 179 .dt.tz_convert(utils.TZ_UTC) 180 df['ended_at'] = pd.to_datetime(df['ended_at'])\ 181 .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\ 182 .dt.tz_convert(utils.TZ_UTC) 183 184 df.index.name = 'tempid' 185 186 return df.dropna(subset=['started_at', 'ended_at']) 187 188 @property 189 def is_downloaded(self) -> bool: 190 return (CACHE_DIR / self.object_key).exists() 191 192 @property 193 def month_str(self) -> str: 194 return f'{self.date.year}-{self.date.month:02d}' 195 196 @property 197 def csv_name(self) -> str: 198 if '.csv.zip' in self.object_key: 199 return self.object_key.replace('.csv.zip', '.csv') 200 201 return self.object_key.replace('.zip', '.csv') 202 203 def __repr__(self): 204 return f'<HistoricalTripArchive {self.date} {self.object_key} />'