forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

commit 3f2a48d88919bffc683098a2dbcbe39ec92ee5a6
parent 2b2aa3aa29a69050509c06b1779604ce5d0ef376
Author: Steve Gattuso <steve@stevegattuso.me>
Date:   Sun,  5 Nov 2023 16:41:43 +0100

implement first pass at daily volume rollup

Diffstat:
MREADME.md | 10+++++++++-
Abin/daily-volume-rollup | 70++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mbin/scraper | 16+++++++++-------
Mforerad/persistence.py | 125+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Aforerad/utils.py | 25+++++++++++++++++++++++++
Amigrations/20231105_01_9c8Vj.sql | 6++++++
6 files changed, 203 insertions(+), 49 deletions(-)

diff --git a/README.md b/README.md @@ -9,8 +9,16 @@ Data collection is the hardest part. I need to: 1. Build a scraper job that fetches, cleans, and stores historical ride data. 2. Build a scraper job that fetches, cleans, and stores any relevant real-time data. 3. Build a forecasting job that combines these datasets into a 3 day prediction. - * The MVP should just take the historical average for the day of the year. 4. Build a job that calculates MAPE for previous forecasts as realtime data comes out. +5. Build some kind of dashboard that displays the predictions and actuals. + +Data to incorporate into a forecast: + +* Historical ridership per day +* Federal holiday status +* Weather + * Temperature min/max + * Precipitation probability ## Open questions * Can I estimate electric citibike revenue with the v2 dataset? diff --git a/bin/daily-volume-rollup b/bin/daily-volume-rollup @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +""" +This script handles the population of the daily_volume_rollup table +""" +import argparse +import datetime +import pandas as pd + +import forerad.persistence as persistence +import forerad.utils as utils + +store = persistence.SQLiteStore() +log = utils.get_logger() + +# The day that Citibike began publishing data on +ORIGIN_DATE = datetime.date(2013, 6, 1) + +def main__populate(month_str): + # Calculate the last day of last month + end_of_last_month = datetime.date.today().replace(day=1) - datetime.timedelta(days=1) + + # Calculate dataframe of all days since the ORIGIN_DATE + to_populate = set(pd.date_range( + start=ORIGIN_DATE, + end=end_of_last_month, + freq="D" + )) + # If provided, filter out any dates not in the specified month + if month_str is not None: + year, month = utils.parse_month_str(month_str) + to_populate = {d for d in to_populate if d.year == year and d.month == month} + + rollup = store.fetch_daily_volume_rollup() + if len(rollup) > 0: + populated_dates = set(rollup['date']) + to_populate -= populated_dates + + if len(to_populate) == 0: + log.info('Nothing remaining to populate. Exiting!') + return + + log.info(f"{len(to_populate)} members need to be populated") + + to_populate = sorted(list(to_populate)) + to_insert: list[tuple[datetime.date, int]] = [] + prev_month = None + for date in to_populate: + # Ratelimit logging + cur_month = date.strftime('%Y-%m') + if cur_month != prev_month: + log.info(f"Rolling up {cur_month}") + prev_month = cur_month + + trip_count = store.fetch_daily_volume(date) + to_insert.append((date, trip_count)) + + log.info(f"Writing {len(to_insert)} rows to rollup") + store.write_daily_volume_rollup(to_insert) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Backfill historical trip data into your data store", + ) + parser.add_argument('action', choices=['populate']) + parser.add_argument('--month', help="The month to populate, in YYYY-MM format") + + args = parser.parse_args() + + if args.action == 'populate': + main__populate(args.month) diff --git a/bin/scraper b/bin/scraper @@ -2,14 +2,16 @@ """ This script allows you to download citibike ride history archives into your local database. """ -import forerad.persistence as persistence -import forerad.scrapers.historical as scrape_historical -import logging import sys import datetime import argparse -logging.getLogger().setLevel('INFO') +import forerad.persistence as persistence +import forerad.utils as utils +import forerad.scrapers.historical as scrape_historical + + +log = utils.get_logger() store = persistence.SQLiteStore() def is_persisted(archive: scrape_historical.MonthlyArchive) -> bool: @@ -32,16 +34,16 @@ def main__fetch(args: argparse.Namespace): if args.month is not None: archives = [a for a in archives if a.date.strftime('%Y-%m') == args.month] if args.month and len(archives) != 1: - logging.error(f'Month filter "{args.month}" yielded {len(archives)} results. Aborting!') + log.error(f'Month filter "{args.month}" yielded {len(archives)} results. Aborting!') sys.exit(1) for archive in archives: month_str = archive.date.strftime("%Y-%m") if is_persisted(archive): - logging.info(f'{month_str} is already persisted, skipping.') + log.info(f'{month_str} is already persisted, skipping.') continue - logging.info(f'Fetching and storing {month_str}') + log.info(f'Fetching and storing {month_str}') trips = scrape_historical.fetch_archive_dt(archive) store.store_trips(trips) diff --git a/forerad/persistence.py b/forerad/persistence.py @@ -13,7 +13,19 @@ class SQLiteStore(): db_path = pathlib.Path(__file__).parent.parent / 'data.sqlite3' self.db = sqlite3.connect(str(db_path)) - def fetch_trips(self, first_dt: datetime.datetime, last_dt: datetime.datetime): + def __localize_date(self, date: datetime.date) -> int: + """ + Convert a date to a localized datetime (in UNIX epoch seconds) that can + be used to query the started_at and ended_at columns. + """ + dt = datetime.datetime.combine(date, datetime.datetime.min.time()) + unix_epoch_str = TZ_NYC.localize(dt)\ + .astimezone(TZ_UTC)\ + .strftime("%s") + + return int(unix_epoch_str) + + def fetch_trips(self, start_date: datetime.date, end_date: datetime.date): """ Fetches a dataframe of all trips where started_at >= first_dt and started_at < last_dt. Note that both columns are stored in UTC. @@ -26,42 +38,10 @@ class SQLiteStore(): started_at < ? """ - # Localize, convert, and format as UNIX epoch - start = TZ_NYC.localize(first_dt)\ - .astimezone(TZ_UTC)\ - .strftime('%s') - end = TZ_NYC.localize(last_dt)\ - .astimezone(TZ_UTC)\ - .strftime('%s') - - return pd.read_sql(query, self.db, params=(start, end)) - - def __store_formatted(self, df): - """ - Store a pre-formatted dataframe (ie has all columns named to match the - historical_trips table) into the database. - """ - df = df[[ - 'started_at', 'ended_at', 'start_station_id', 'end_station_id', - 'start_lat', 'start_lng', 'end_lat', 'end_lng', - ]] - df = df.replace({np.nan: None}) - - # Localize timestamps, convert to UTC, then convert to UNIX epoch to - # make things speedier when querying in sqlite - df['started_at'] = pd.to_datetime(df['started_at'])\ - .dt.tz_localize(TZ_UTC)\ - .astype(int) / 10**9 - df['ended_at'] = pd.to_datetime(df['ended_at'])\ - .dt.tz_localize(TZ_UTC)\ - .astype(int) / 10**9 + start_dt = self.__localize_date(start_date) + end_dt = self.__localize_date(end_date) - df.to_sql( - name="historical_trips", - con=self.db, - if_exists='append', - index=False, - ) + return pd.read_sql(query, self.db, params=(start_dt, end_dt)) def store_trips(self, df): """ @@ -98,11 +78,74 @@ class SQLiteStore(): # The v2 dataset should work with no transformations pass - return self.__store_formatted(df) + df = df[[ + 'started_at', 'ended_at', 'start_station_id', 'end_station_id', + 'start_lat', 'start_lng', 'end_lat', 'end_lng', + ]] + df = df.replace({np.nan: None}) - def store_v2_trips(self, df): + # Localize timestamps, convert to UTC, then convert to UNIX epoch to + # make things speedier when querying in sqlite + df['started_at'] = pd.to_datetime(df['started_at'])\ + .dt.tz_localize(TZ_UTC)\ + .astype(int) / 10**9 + df['ended_at'] = pd.to_datetime(df['ended_at'])\ + .dt.tz_localize(TZ_UTC)\ + .astype(int) / 10**9 + + df.to_sql( + name="historical_trips", + con=self.db, + if_exists='append', + index=False, + ) + + def fetch_daily_volume(self, date: datetime.date) -> int: """ - Stores a dataframe of historical trips in the newer v2 schema format + Calculate the number of trips taken on a given day. """ - import pdb; pdb.set_trace() - return self.__store_formatted(df) + query = """ + SELECT + date(started_at, 'unixepoch'), + COUNT(*) + FROM historical_trips + WHERE + started_at >= ? AND + started_at < ? + """ + + start_dt = self.__localize_date(date) + end_dt = self.__localize_date(date + datetime.timedelta(days=1)) + + result = pd.read_sql(query, self.db, params=(start_dt, end_dt)) + + return result.iat[0, 1] + + def fetch_daily_volume_rollup(self) -> pd.DataFrame: + """ + Returns a set of days that have an entry in the volume rollup table + """ + query = """ + SELECT + date(date, 'unixepoch') AS date, + trip_count + FROM daily_volume_rollup + """ + df = pd.read_sql(query, self.db) + df['date'] = pd.to_datetime(df['date']) + + return df + + def write_daily_volume_rollup(self, values: list[tuple[datetime.date, int]]): + """ + Writes a daily volume rollup member to the table + """ + to_insert = [(self.__localize_date(d), c) for d, c in values] + + df = pd.DataFrame(to_insert, columns=['date', 'trip_count']) + df.to_sql( + name="daily_volume_rollup", + con=self.db, + if_exists="append", + index=False, + ) diff --git a/forerad/utils.py b/forerad/utils.py @@ -0,0 +1,25 @@ +import re +import logging + +def parse_month_str(month_str: str) -> tuple[int, int]: + """ + Parses a string formatted YYYY-MM into a tuple of (year, month). Used for + parsing CLI arguments to bin/ scripts + """ + match = re.match(r"^([\d]{4})-([\d]{2})$", month_str) + if match is None: + raise Exception(f'Invalid month string: {month_str}') + + year, month = match.groups() + return (int(year), int(month)) + +def get_logger(): + logger = logging.getLogger('forerad') + + stream = logging.StreamHandler() + fmt = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s") + stream.setFormatter(fmt) + + logger.addHandler(stream) + logger.setLevel(logging.INFO) + return logger diff --git a/migrations/20231105_01_9c8Vj.sql b/migrations/20231105_01_9c8Vj.sql @@ -0,0 +1,6 @@ +-- +-- depends: 20231030_01_HoM9F-create-the-initial-schema +CREATE TABLE daily_volume_rollup ( + date NUMERIC NOT NULL PRIMARY KEY, + trip_count NUMERIC +);