commit 430aff814eff63bde364c8db6439aaec13193014
parent 0c7c12667a81efe12c32b6627de9a6a36174a789
Author: Steve Gattuso <steve@stevegattuso.me>
Date: Sun, 5 Nov 2023 11:54:41 +0100
scraper works
Diffstat:
9 files changed, 208 insertions(+), 65 deletions(-)
diff --git a/.envrc b/.envrc
@@ -1 +1,2 @@
layout python3
+export PYTHONPATH=$PYTHONPATH:.
diff --git a/.gitignore b/.gitignore
@@ -1,3 +1,5 @@
.direnv
__pycache__
.forerad-cache
+notebooks
+data.sqlite3
diff --git a/README.md b/README.md
@@ -11,3 +11,9 @@ Data collection is the hardest part. I need to:
3. Build a forecasting job that combines these datasets into a 3 day prediction.
* The MVP should just take the historical average for the day of the year.
4. Build a job that calculates MAPE for previous forecasts as realtime data comes out.
+
+## Open questions
+* Can I estimate electric citibike revenue with the v2 dataset?
+
+## Prior art
+* [This blog post](https://medium.com/@kumbharniraj1/citi-bike-trips-analysis-and-prediction-63fcb557354d) details somebody trying to do this with out-of-the-box machine learning models.
diff --git a/bin/scraper b/bin/scraper
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+"""
+This script allows you to download citibike ride history archives into your local database.
+"""
+import forerad.persistence as persistence
+import forerad.scrapers.historical as scrape_historical
+import logging
+import sys
+import datetime
+import argparse
+
+logging.getLogger().setLevel('INFO')
+store = persistence.SQLiteStore()
+
+def is_persisted(archive: scrape_historical.MonthlyArchive) -> bool:
+ """
+ Returns whether or not an archive is already persisted in the database.
+ """
+ first_dt = datetime.datetime.combine(archive.date, datetime.datetime.min.time())
+
+ trips = store.fetch_trips(
+ first_dt=first_dt,
+ last_dt=first_dt + datetime.timedelta(days=1)
+ )
+
+ return len(trips) > 0
+
+
+def fetch_and_store_archive(archive: scrape_historical.MonthlyArchive):
+ trips = scrape_historical.fetch_archive_dt(archive)
+
+ if archive.schema_version == 1:
+ store.store_v1_trips(trips)
+ else:
+ store.store_v2_trips(trips)
+
+
+def main__fetch(args: argparse.Namespace):
+ archives = scrape_historical.fetch_archives()
+
+ if args.month is not None:
+ archives = [a for a in archives if a.date.strftime('%Y-%m') == args.month]
+ if args.month and len(archives) != 1:
+ logging.error(f'Month filter "{args.month}" yielded {len(archives)} results. Aborting!')
+ sys.exit(1)
+
+ for archive in archives:
+ month_str = archive.date.strftime("%Y-%m")
+ if is_persisted(archive):
+ logging.info(f'{month_str} is already persisted, skipping.')
+ continue
+
+ logging.info(f'Fetching and storing {month_str}')
+ fetch_and_store_archive(archive)
+
+
+def main__list():
+ archives = scrape_historical.fetch_archives()
+
+ print('Available archives:\n')
+ for archive in archives:
+ presence = '[x]' if is_persisted(archive) else '[ ]'
+ print(f'\t{presence} {archive.date.strftime("%Y-%m")} (v{archive.schema_version})')
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Backfill historical trip data into your data store",
+ )
+ parser.add_argument('action', choices=['list', 'fetch'])
+ parser.add_argument('--month', help="The month to download, in YYYY-MM format")
+
+ args = parser.parse_args()
+
+ if args.action == 'fetch':
+ main__fetch(args)
+ elif args.action == 'list':
+ main__list()
diff --git a/data.sqlite3 b/data.sqlite3
Binary files differ.
diff --git a/forerad/persistence.py b/forerad/persistence.py
@@ -0,0 +1,67 @@
+import sqlite3
+import pandas as pd
+import numpy as np
+import datetime
+import pathlib
+
+class SQLiteStore():
+ def __init__(self):
+ db_path = pathlib.Path(__file__).parent.parent / 'data.sqlite3'
+ self.db = sqlite3.connect(str(db_path))
+
+ def fetch_trips(self, first_dt: datetime.datetime, last_dt: datetime.datetime):
+ """
+ Fetches a dataframe of all trips where started_at >= first_dt and
+ started_at < last_dt. Note that both columns are stored in UTC.
+ """
+
+ query = """
+ SELECT *
+ FROM historical_trips
+ WHERE
+ started_at >= ? AND
+ started_at < ?
+ """
+
+ return pd.read_sql(query, self.db, params=(first_dt, last_dt))
+
+ def __store_formatted(self, df):
+ """
+ Store a pre-formatted dataframe (ie has all columns named to match the
+ historical_trips table) into the database.
+ """
+ df = df[[
+ 'started_at', 'ended_at', 'start_station_id', 'end_station_id',
+ 'start_lat', 'start_lng', 'end_lat', 'end_lng',
+ ]]
+ df = df.replace({np.nan: None})
+
+ df.to_sql(
+ name="historical_trips",
+ con=self.db,
+ if_exists='append',
+ index=False,
+ )
+
+ def store_v1_trips(self, df):
+ """
+ Stores a dataframe of historical trips in the v1 schema format
+ """
+ df = df.rename(columns={
+ 'starttime': 'started_at',
+ 'stoptime': 'ended_at',
+ 'start station id': 'start_station_id',
+ 'end station id': 'end_station_id',
+ 'start station latitude': 'start_lat',
+ 'start station longitude': 'start_lng',
+ 'end station latitude': 'end_lat',
+ 'end station longitude': 'end_lng',
+ })
+
+ return self.__store_formatted(df)
+
+ def store_v2_trips(self, df):
+ """
+ Stores a dataframe of historical trips in the newer v2 schema format
+ """
+ return self.__store_formatted(df)
diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py
@@ -17,13 +17,40 @@ class MonthlyArchive():
self.object_key = object_key
self.date = date
+ @classmethod
+ def from_s3(cls, obj: dict):
+ """
+ Converts an S3 object dictionary into a MonthlyArchive record, returning
+ None if the object doesn't look like a monthly ride archive.
+ """
+ match = re.match(r"^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))", obj['Key'])
+ if match is None:
+ logging.debug(f"Skipping object {obj['Key']}")
+ return None
+
+ groups = match.groups()
+
+ return cls(
+ object_key=obj['Key'],
+ date=datetime.date(int(groups[0]), int(groups[1]), 1)
+ )
+
+ def __repr__(self):
+ return f'<MonthlyArchive {self.date} {self.object_key} />'
+
+ @property
+ def schema_version(self):
+ if '.csv.zip' in self.object_key:
+ return 2
+
+ return 1
+
@property
def csv_name(self) -> str:
- return (
- self.object_key
- .replace('.zip.csv', '.csv')
- .replace('.zip', '.csv')
- )
+ if '.csv.zip' in self.object_key:
+ return self.object_key.replace('.csv.zip', '.csv')
+
+ return self.object_key.replace('.zip', '.csv')
class ArchiveCache():
@@ -62,24 +89,6 @@ def __get_s3_client():
return boto3.client('s3', config=config)
-def __parse_object(obj: dict) -> MonthlyArchive | None:
- """
- Converts an S3 object dictionary into a MonthlyArchive record, returning
- None if the object doesn't look like a monthly ride archive.
- """
- match = re.match(r"^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))", obj['Key'])
- if match is None:
- logging.debug(f"Skipping object {obj['Key']}")
- return None
-
- groups = match.groups()
-
- return MonthlyArchive(
- object_key=obj['Key'],
- date=datetime.date(int(groups[0]), int(groups[1]), 1)
- )
-
-
def fetch_archives() -> list[MonthlyArchive]:
"""
Fetches a list of archive definitions from the Citibike S3 bucket
@@ -87,11 +96,11 @@ def fetch_archives() -> list[MonthlyArchive]:
s3 = __get_s3_client()
resp = s3.list_objects_v2(Bucket=TRIP_BUCKET)
- archives = [__parse_object(o) for o in resp['Contents']]
+ archives = [MonthlyArchive.from_s3(o) for o in resp['Contents']]
return [a for a in archives if a is not None]
-def fetch_archive(archive: MonthlyArchive) -> pd.DataFrame | None:
+def fetch_archive_dt(archive: MonthlyArchive) -> pd.DataFrame | None:
"""
Fetches an archive from S3 and unzips the contents into a readable blob.
"""
@@ -108,13 +117,17 @@ def fetch_archive(archive: MonthlyArchive) -> pd.DataFrame | None:
with zipfile.ZipFile(archive_blob, 'r') as zip_archive:
file_list = zip_archive.namelist()
+ csv_name = archive.csv_name
- if archive.csv_name not in file_list:
- logging.error(f"Could not extract {archive.object_key}:")
+ if csv_name not in file_list and len(file_list) != 1:
+ logging.error(f"Could not extract {archive.csv_name}:")
logging.error(file_list)
return None
- with zip_archive.open(archive.csv_name, 'r') as csv:
+ if csv_name not in file_list:
+ csv_name = file_list[0]
+
+ with zip_archive.open(csv_name, 'r') as csv:
ride_df = pd.read_csv(csv)
return ride_df
diff --git a/migrations/20231030_01_HoM9F-create-the-initial-schema.sql b/migrations/20231030_01_HoM9F-create-the-initial-schema.sql
@@ -1,14 +1,19 @@
-- Create the initial schema
-- depends:
-CREATE TABLE raw_trips (
- id INTEGER PRIMARY KEY,
- tripduration INTEGER NOT NULL,
- starttime TEXT NOT NULL,
- stoptime TEXT NOT NULL,
- start_station_id TEXT NOT NULL,
- end_station_id TEXT NOT NULL,
- bikeid INTEGER NOT NULL
+CREATE TABLE historical_trips (
+ ride_id INTEGER PRIMARY KEY,
+ rideable_type TEXT,
+ started_at TEXT NOT NULL,
+ ended_at TEXT NOT NULL,
+ start_lat NUMERIC,
+ start_lng NUMERIC,
+ end_lat NUMERIC,
+ end_lng NUMERIC,
+ start_station_id TEXT,
+ end_station_id TEXT
);
+CREATE INDEX historical_trips_start ON historical_trips (started_at);
+CREATE INDEX historical_trips_end ON historical_trips (ended_at);
CREATE TABLE raw_stations (
@@ -19,21 +24,3 @@ CREATE TABLE raw_stations (
lat REAL NOT NULL,
lon REAL NOT NULL
);
-
-
-CREATE TABLE cleaned_stations (
- station_id TEXT NOT NULL PRIMARY KEY,
- name TEXT,
- borough TEXT,
- capacity INTEGER,
- lat REAL NOT NULL,
- lon REAL NOT NULL
-);
-
-
-CREATE TABLE hourly_trip_rollup (
- datetime TEXT NOT NULL,
- borough TEXT NOT NULL,
- ride_count INTEGER NOT NULL,
- PRIMARY KEY (datetime, borough)
-);
diff --git a/test.py b/test.py
@@ -1,11 +0,0 @@
-import logging
-import forerad.scrapers.historical as historical
-
-logging.basicConfig(level=20)
-
-archives = historical.fetch_archives()
-print(f"Fetching {archives[0]}")
-trip_df = historical.fetch_archive(archives[0])
-
-print('Trip df:')
-print(trip_df.columns)