forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

commit 711944d8bc88d9be4dc2e2c76264208080873209
parent 92ffcdb3e60f4e5472ef23cc1b4c658874119848
Author: Steve Gattuso <steve@stevegattuso.me>
Date:   Mon,  6 Nov 2023 08:29:04 +0100

dont store trips in sqlite

Diffstat:
Dbin/daily-volume-rollup | 72------------------------------------------------------------------------
Abin/hourly-volume-rollup | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mbin/scraper | 30+++++-------------------------
Mforerad/persistence.py | 11+++++------
Mforerad/scrapers/historical.py | 154++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mmigrations/20231030_01_HoM9F-create-the-initial-schema.sql | 27++++-----------------------
Dmigrations/20231105_01_9c8Vj.sql | 6------
7 files changed, 195 insertions(+), 201 deletions(-)

diff --git a/bin/daily-volume-rollup b/bin/daily-volume-rollup @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -""" -This script handles the population of the daily_volume_rollup table -""" -import argparse -import datetime -import pandas as pd - -import forerad.persistence as persistence -import forerad.utils as utils - -store = persistence.SQLiteStore() -log = utils.get_logger() - -# The day that Citibike began publishing data on -ORIGIN_DATE = datetime.date(2013, 6, 1) - -def main__populate(month_str): - # Calculate the last day of last month - most_recent_trip_dt = store.fetch_latest_trip_dt() - if most_recent_trip_dt is None: - raise Exception('No trips found!') - - # Calculate dataframe of all days since the ORIGIN_DATE - to_populate = set(pd.date_range( - start=ORIGIN_DATE, - end=most_recent_trip_dt.date(), - freq="D" - )) - # If provided, filter out any dates not in the specified month - if month_str is not None: - year, month = utils.parse_month_str(month_str) - to_populate = {d for d in to_populate if d.year == year and d.month == month} - - rollup = store.fetch_daily_volume_rollup() - if len(rollup) > 0: - populated_dates = set(rollup['date']) - to_populate -= populated_dates - - if len(to_populate) == 0: - log.info('Nothing remaining to populate. Exiting!') - return - - log.info(f"{len(to_populate)} members need to be populated") - - to_populate = sorted(list(to_populate)) - to_insert: list[tuple[datetime.date, int]] = [] - prev_month = None - for date in to_populate: - # Ratelimit logging - cur_month = date.strftime('%Y-%m') - if cur_month != prev_month: - log.info(f"Rolling up {cur_month}") - prev_month = cur_month - - trip_count = store.fetch_daily_volume(date) - to_insert.append((date, trip_count)) - - log.info(f"Writing {len(to_insert)} rows to rollup") - store.write_daily_volume_rollup(to_insert) - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Backfill historical trip data into your data store", - ) - parser.add_argument('action', choices=['populate']) - parser.add_argument('--month', help="The month to populate, in YYYY-MM format") - - args = parser.parse_args() - - if args.action == 'populate': - main__populate(args.month) diff --git a/bin/hourly-volume-rollup b/bin/hourly-volume-rollup @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +This script handles the population of the daily_volume_rollup table +""" +import argparse +import datetime +import pandas as pd + +import forerad.scrapers.historical as historical +import forerad.persistence as persistence +import forerad.utils as utils + +store = persistence.SQLiteStore() +log = utils.get_logger() + +# The day that Citibike began publishing data on +ORIGIN_DATE = datetime.date(2013, 6, 1) + +def detect_missing_members(year: int, month: int) -> set[datetime.datetime]: + """ + Given a month, look at the hourly rollup and generate a set of missing + members + """ +forerad/scrapers/historical + + +def main__populate(month_str): + # Calculate the last day of last month + most_recent_trip_dt = store.fetch_latest_trip_dt() + if most_recent_trip_dt is None: + raise Exception('No trips found!') + + # Calculate dataframe of all days since the ORIGIN_DATE + to_populate = set(pd.date_range( + start=ORIGIN_DATE, + end=most_recent_trip_dt.date(), + freq="D" + )) + # If provided, filter out any dates not in the specified month + if month_str is not None: + year, month = utils.parse_month_str(month_str) + to_populate = {d for d in to_populate if d.year == year and d.month == month} + + rollup = store.fetch_daily_volume_rollup() + if len(rollup) > 0: + populated_dates = set(rollup['date']) + to_populate -= populated_dates + + if len(to_populate) == 0: + log.info('Nothing remaining to populate. Exiting!') + return + + log.info(f"{len(to_populate)} members need to be populated") + + to_populate = sorted(list(to_populate)) + to_insert: list[tuple[datetime.date, int]] = [] + prev_month = None + for date in to_populate: + # Ratelimit logging + cur_month = date.strftime('%Y-%m') + if cur_month != prev_month: + log.info(f"Rolling up {cur_month}") + prev_month = cur_month + + trip_count = store.fetch_daily_volume(date) + to_insert.append((date, trip_count)) + + log.info(f"Writing {len(to_insert)} rows to rollup") + store.write_daily_volume_rollup(to_insert) + + +def main__status(): + cache = historical.ArchiveCache() + available_archives = cache.list_archives() + for archive in available_archives: + first_date = archive.date + last_date = (archive.date + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(1) + pd.date_range( + start=first_date, + end=last_date, + ) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Backfill historical trip data into your data store", + ) + parser.add_argument('action', choices=['status', 'populate']) + parser.add_argument('--month', help="The month to populate, in YYYY-MM format") + + args = parser.parse_args() + + if args.action == 'populate': + main__populate(args.month) + elif args.action == 'status': + main__status() diff --git a/bin/scraper b/bin/scraper @@ -3,7 +3,6 @@ This script allows you to download citibike ride history archives into your local database. """ import sys -import datetime import argparse import pandas as pd @@ -15,26 +14,9 @@ import forerad.scrapers.historical as scrape_historical log = utils.get_logger() store = persistence.SQLiteStore() -def is_persisted(archive: scrape_historical.MonthlyArchive) -> bool: - """ - Returns whether or not an archive is already persisted in the database. - """ - next_month = (archive.date.replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - all_days = pd.date_range( - start=archive.date, - end=next_month - datetime.timedelta(days=1), - freq="D" - ) - - for date in all_days: - if store.fetch_daily_volume(date) == 0: - return False - - return True - def main__fetch(args: argparse.Namespace): - archives = scrape_historical.fetch_archives() + archives = scrape_historical.fetch_remote_archives() if args.month is not None: archives = [a for a in archives if a.date.strftime('%Y-%m') == args.month] @@ -44,22 +26,20 @@ def main__fetch(args: argparse.Namespace): for archive in archives: month_str = archive.date.strftime("%Y-%m") - if is_persisted(archive): + if archive.is_downloaded: log.info(f'{month_str} is already persisted, skipping.') continue log.info(f'Fetching and storing {month_str}') - - trips = scrape_historical.fetch_archive_dt(archive) - store.store_trips(trips) + archive.fetch_df() def main__list(): - archives = scrape_historical.fetch_archives() + archives = scrape_historical.fetch_remote_archives() print('Available archives:\n') for archive in archives: - presence = '[x]' if is_persisted(archive) else '[ ]' + presence = '[x]' if archive.is_downloaded else '[ ]' print(f'\t{presence} {archive.date.strftime("%Y-%m")}') diff --git a/forerad/persistence.py b/forerad/persistence.py @@ -59,7 +59,6 @@ class SQLiteStore(): Stores a dataframe of historical trips, transforming the data depending on the schema it detects """ - if 'starttime' in df.columns: # Transform v1 schema df = df.rename(columns={ @@ -85,13 +84,13 @@ class SQLiteStore(): 'End Station Latitude': 'end_lat', 'End Station Longitude': 'end_lng', }) - else: - # The v2 dataset should work with no transformations - pass + + if 'rideable_type' not in df.columns: + df['rideable_type'] = None df = df[[ - 'started_at', 'ended_at', 'start_station_id', 'end_station_id', - 'start_lat', 'start_lng', 'end_lat', 'end_lng', + 'rideable_type', 'started_at', 'ended_at', 'start_station_id', + 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng', ]] df = df.replace({np.nan: None}) diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py @@ -1,6 +1,5 @@ import re import datetime -import logging import zipfile import io import boto3 @@ -9,7 +8,22 @@ import pandas as pd import botocore.client as bclient from botocore import UNSIGNED -class MonthlyArchive(): +import forerad.utils as utils + +log = utils.get_logger() +ARCHIVE_REGEX = re.compile("^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))") +CACHE_DIR = pathlib.Path('.forerad-cache') +TRIP_BUCKET = 'tripdata' +if not CACHE_DIR.exists(): + CACHE_DIR.mkdir() + log.debug('Initializing .cache dir') + +def __get_s3_client(): + config = bclient.Config(signature_version=UNSIGNED) + return boto3.client('s3', config=config) + + +class HistoricalTripArchive(): object_key: str date: datetime.date @@ -20,12 +34,12 @@ class MonthlyArchive(): @classmethod def from_s3(cls, obj: dict): """ - Converts an S3 object dictionary into a MonthlyArchive record, returning + Converts an S3 object dictionary into a HistoricalTripArchive record, returning None if the object doesn't look like a monthly ride archive. """ - match = re.match(r"^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))", obj['Key']) + match = ARCHIVE_REGEX.match(obj['Key']) if match is None: - logging.debug(f"Skipping object {obj['Key']}") + log.error(f"Skipping object {obj['Key']}") return None groups = match.groups() @@ -35,92 +49,94 @@ class MonthlyArchive(): date=datetime.date(int(groups[0]), int(groups[1]), 1) ) - def __repr__(self): - return f'<MonthlyArchive {self.date} {self.object_key} />' - - @property - def csv_name(self) -> str: - if '.csv.zip' in self.object_key: - return self.object_key.replace('.csv.zip', '.csv') - - return self.object_key.replace('.zip', '.csv') - + @classmethod + def from_archive_path(cls, path: pathlib.Path): + match = ARCHIVE_REGEX.match(path.name) + if match is None: + return None -class ArchiveCache(): - def __init__(self): - self.cache_dir = pathlib.Path('.forerad-cache') - self.__ensure_cache() + groups = match.groups() - def __ensure_cache(self): - if not self.cache_dir.exists(): - self.cache_dir.mkdir() - logging.debug('Initializing .cache dir') - return None + return cls( + object_key=path.name, + date=datetime.date(int(groups[0]), int(groups[1]), 1) + ) - def get_archive(self, archive: MonthlyArchive): - archive_path = self.cache_dir / archive.object_key + @classmethod + def list_cached(cls): + """ + Returns a list of HistoricalTripArchive objects for all archives that have already + been downloaded + """ + unfiltered = [cls.from_archive_path(p) for p in CACHE_DIR.glob('*')] + return [a for a in unfiltered if a is not None] + + def __fetch_cached_blob(self) -> io.BytesIO | None: + archive_path = CACHE_DIR / self.object_key if not archive_path.exists(): return None with open(archive_path, 'rb') as f: - logging.info(f"Loading {archive.object_key} from cache") + log.info(f"Loading {self.object_key} from cache") return io.BytesIO(f.read()) - def store_archive(self, archive: MonthlyArchive, blob): - archive_path = self.cache_dir / archive.object_key + def __store_blob(self, blob: io.BytesIO): + archive_path = CACHE_DIR / self.object_key with open(archive_path, 'wb') as f: - logging.info(f"Storing {archive.object_key} in cache") + log.info(f"Storing {self.object_key} in cache") f.write(blob.getbuffer()) blob.seek(0) -TRIP_BUCKET = 'tripdata' - + def fetch_df(self) -> pd.DataFrame: + """ + Fetches a DataFrame of the archive. If the archive doesn't exist on the + filesystem it will attempt to download it from S3. + """ + blob = self.__fetch_cached_blob() + if blob is None: + log.info(f"Fetching {self.csv_name} from S3") + s3 = __get_s3_client() + resp = s3.get_object(Bucket=TRIP_BUCKET, Key=self.object_key) + blob = io.BytesIO(resp['Body'].read()) + self.__store_blob(blob) -def __get_s3_client(): - config = bclient.Config(signature_version=UNSIGNED) - return boto3.client('s3', config=config) + with zipfile.ZipFile(blob, 'r') as zip_archive: + file_list = zip_archive.namelist() + csv_name = self.csv_name + if csv_name not in file_list and len(file_list) != 1: + log.error(f"Could not extract {self.csv_name}:") + log.error(file_list) + raise Exception("Could not extract {self.csv_name}") -def fetch_archives() -> list[MonthlyArchive]: - """ - Fetches a list of archive definitions from the Citibike S3 bucket - """ - s3 = __get_s3_client() - resp = s3.list_objects_v2(Bucket=TRIP_BUCKET) - - archives = [MonthlyArchive.from_s3(o) for o in resp['Contents']] - return [a for a in archives if a is not None] + if csv_name not in file_list: + csv_name = file_list[0] + with zip_archive.open(csv_name, 'r') as csv: + return pd.read_csv(csv) -def fetch_archive_dt(archive: MonthlyArchive) -> pd.DataFrame | None: - """ - Fetches an archive from S3 and unzips the contents into a readable blob. - """ - cache = ArchiveCache() - - archive_blob = cache.get_archive(archive) - if archive_blob is None: - logging.info(f"Fetching {archive.csv_name} from S3") - s3 = __get_s3_client() - resp = s3.get_object(Bucket=TRIP_BUCKET, Key=archive.object_key) - archive_blob = io.BytesIO(resp['Body'].read()) + @property + def is_downloaded(self) -> bool: + return (CACHE_DIR / self.object_key).exists() - cache.store_archive(archive, archive_blob) + @property + def csv_name(self) -> str: + if '.csv.zip' in self.object_key: + return self.object_key.replace('.csv.zip', '.csv') - with zipfile.ZipFile(archive_blob, 'r') as zip_archive: - file_list = zip_archive.namelist() - csv_name = archive.csv_name + return self.object_key.replace('.zip', '.csv') - if csv_name not in file_list and len(file_list) != 1: - logging.error(f"Could not extract {archive.csv_name}:") - logging.error(file_list) - return None + def __repr__(self): + return f'<HistoricalTripArchive {self.date} {self.object_key} />' - if csv_name not in file_list: - csv_name = file_list[0] - with zip_archive.open(csv_name, 'r') as csv: - ride_df = pd.read_csv(csv) +def fetch_remote_archives() -> list[HistoricalTripArchive]: + """ + Fetches a list of archive definitions from the Citibike S3 bucket + """ + s3 = __get_s3_client() + resp = s3.list_objects_v2(Bucket=TRIP_BUCKET) - return ride_df + archives = [HistoricalTripArchive.from_s3(o) for o in resp['Contents']] + return [a for a in archives if a is not None] diff --git a/migrations/20231030_01_HoM9F-create-the-initial-schema.sql b/migrations/20231030_01_HoM9F-create-the-initial-schema.sql @@ -1,26 +1,7 @@ -- Create the initial schema -- depends: -CREATE TABLE historical_trips ( - ride_id INTEGER PRIMARY KEY, - rideable_type TEXT, - started_at NUMERIC NOT NULL, - ended_at NUMERIC NOT NULL, - start_lat NUMERIC, - start_lng NUMERIC, - end_lat NUMERIC, - end_lng NUMERIC, - start_station_id TEXT, - end_station_id TEXT -); -CREATE INDEX historical_trips_start ON historical_trips (started_at); -CREATE INDEX historical_trips_end ON historical_trips (ended_at); - - -CREATE TABLE raw_stations ( - station_id TEXT NOT NULL PRIMARY KEY, - name TEXT, - short_name TEXT, - capacity INTEGER, - lat REAL NOT NULL, - lon REAL NOT NULL +CREATE TABLE hourly_volume_rollup ( + datetime NUMERIC NOT NULL PRIMARY KEY, + trip_count NUMERIC, + calculated_at NUMERIC ); diff --git a/migrations/20231105_01_9c8Vj.sql b/migrations/20231105_01_9c8Vj.sql @@ -1,6 +0,0 @@ --- --- depends: 20231030_01_HoM9F-create-the-initial-schema -CREATE TABLE daily_volume_rollup ( - date NUMERIC NOT NULL PRIMARY KEY, - trip_count NUMERIC -);