commit 6da36f7e4f10e281122a50db81de2b28bffad452
parent 711944d8bc88d9be4dc2e2c76264208080873209
Author: Steve Gattuso <steve@stevegattuso.me>
Date: Mon, 6 Nov 2023 10:20:11 +0100
calculate hourly volume rollup
Diffstat:
4 files changed, 126 insertions(+), 174 deletions(-)
diff --git a/bin/hourly-volume-rollup b/bin/hourly-volume-rollup
@@ -16,69 +16,54 @@ log = utils.get_logger()
# The day that Citibike began publishing data on
ORIGIN_DATE = datetime.date(2013, 6, 1)
-def detect_missing_members(year: int, month: int) -> set[datetime.datetime]:
+def detect_status(year: int, month: int) -> tuple[set[pd.Timestamp], set[pd.Timestamp], set[pd.Timestamp]]:
"""
Given a month, look at the hourly rollup and generate a set of missing
members
"""
-forerad/scrapers/historical
-
-
-def main__populate(month_str):
- # Calculate the last day of last month
- most_recent_trip_dt = store.fetch_latest_trip_dt()
- if most_recent_trip_dt is None:
- raise Exception('No trips found!')
-
- # Calculate dataframe of all days since the ORIGIN_DATE
- to_populate = set(pd.date_range(
- start=ORIGIN_DATE,
- end=most_recent_trip_dt.date(),
- freq="D"
+ first_day = datetime.date(year, month, 1)
+ next_month = utils.next_month(first_day.year, first_day.month)
+ expected_members = set(pd.date_range(
+ start=first_day,
+ end=next_month,
+ freq="1H",
+ inclusive="left"
))
- # If provided, filter out any dates not in the specified month
- if month_str is not None:
- year, month = utils.parse_month_str(month_str)
- to_populate = {d for d in to_populate if d.year == year and d.month == month}
- rollup = store.fetch_daily_volume_rollup()
- if len(rollup) > 0:
- populated_dates = set(rollup['date'])
- to_populate -= populated_dates
+ rollup = store.fetch_hourly_volume_rollup(first_day, next_month)
+ actual_members = set(rollup.index.values)
- if len(to_populate) == 0:
- log.info('Nothing remaining to populate. Exiting!')
- return
+ missing = expected_members - actual_members
+ return expected_members, actual_members, missing
- log.info(f"{len(to_populate)} members need to be populated")
- to_populate = sorted(list(to_populate))
- to_insert: list[tuple[datetime.date, int]] = []
- prev_month = None
- for date in to_populate:
- # Ratelimit logging
- cur_month = date.strftime('%Y-%m')
- if cur_month != prev_month:
- log.info(f"Rolling up {cur_month}")
- prev_month = cur_month
+def derive_rollup(a: historical.HistoricalTripArchive):
+ df = a.fetch_df()[['started_at']].reset_index()
+ grouper = pd.Grouper(key='started_at', freq='1H')
+ df = df.groupby(grouper).count()\
+ .rename(columns={'tempid': 'trip_count'})\
+ .reset_index()
- trip_count = store.fetch_daily_volume(date)
- to_insert.append((date, trip_count))
+ store.write_hourly_volume_rollup(df)
+ log.info(f"Wrote {len(df)} members to table")
- log.info(f"Writing {len(to_insert)} rows to rollup")
- store.write_daily_volume_rollup(to_insert)
+def main__populate(month_str):
+ archives = historical.HistoricalTripArchive.list_cached()
+ if month_str is not None:
+ year, month = utils.parse_month_str(month_str)
+ archives = [a for a in archives if a.date.year == year and a.date.month == month]
+
+ [derive_rollup(a) for a in archives]
def main__status():
- cache = historical.ArchiveCache()
- available_archives = cache.list_archives()
+ available_archives = historical.HistoricalTripArchive.list_cached()
+
+ print('Archive status:')
for archive in available_archives:
- first_date = archive.date
- last_date = (archive.date + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(1)
- pd.date_range(
- start=first_date,
- end=last_date,
- )
+ expected, actual, missing = detect_status(archive.date.year, archive.date.month)
+ presence = '[x]' if len(missing) == 0 else '[ ]'
+ print(f'\t{presence} {archive.date.year}-{archive.date.month:02d} ({len(actual)}/{len(expected)})')
if __name__ == '__main__':
diff --git a/forerad/persistence.py b/forerad/persistence.py
@@ -5,8 +5,7 @@ import datetime
import pathlib
import pytz
-TZ_NYC = pytz.timezone('America/New_York')
-TZ_UTC = pytz.timezone('UTC')
+import forerad.utils as utils
class SQLiteStore():
def __init__(self):
@@ -19,142 +18,46 @@ class SQLiteStore():
be used to query the started_at and ended_at columns.
"""
dt = datetime.datetime.combine(date, datetime.datetime.min.time())
- unix_epoch_str = TZ_NYC.localize(dt)\
- .astimezone(TZ_UTC)\
+ unix_epoch_str = utils.TZ_NYC.localize(dt)\
+ .astimezone(utils.TZ_UTC)\
.strftime("%s")
return int(unix_epoch_str)
- def fetch_trips(self, start_date: datetime.date, end_date: datetime.date):
- """
- Fetches a dataframe of all trips where started_at >= first_dt and
- started_at < last_dt. Note that both columns are stored in UTC.
- """
+
+ def fetch_hourly_volume_rollup(self, start_date: datetime.date, end_date: datetime.date) -> pd.DataFrame:
query = """
- SELECT *
- FROM historical_trips
+ SELECT
+ date(datetime, 'unixepoch') AS datetime,
+ trip_count
+ FROM hourly_volume_rollup
WHERE
- started_at >= ? AND
- started_at < ?
+ datetime >= ? AND
+ datetime < ?
"""
start_dt = self.__localize_date(start_date)
end_dt = self.__localize_date(end_date)
- return pd.read_sql(query, self.db, params=(start_dt, end_dt))
-
- def fetch_latest_trip_dt(self) -> datetime.datetime | None:
- query = "SELECT MAX(started_at) FROM historical_trips"
- result = pd.read_sql(query, self.db).iat[0, 0]
+ results = pd.read_sql(query, self.db, params=(start_dt, end_dt))
+ return results.set_index('datetime')
- if result is None:
- return None
- return TZ_UTC\
- .localize(datetime.datetime.fromtimestamp(int(float(result))))\
- .astimezone(TZ_NYC)
-
- def store_trips(self, df):
- """
- Stores a dataframe of historical trips, transforming the data depending
- on the schema it detects
- """
- if 'starttime' in df.columns:
- # Transform v1 schema
- df = df.rename(columns={
- 'starttime': 'started_at',
- 'stoptime': 'ended_at',
- 'start station id': 'start_station_id',
- 'end station id': 'end_station_id',
- 'start station latitude': 'start_lat',
- 'start station longitude': 'start_lng',
- 'end station latitude': 'end_lat',
- 'end station longitude': 'end_lng',
- })
- elif 'Start Time' in df.columns:
- # This is a weird in-between state of v1 and v2 that starts in the
- # 2016-10 dataset
- df = df.rename(columns={
- 'Start Time': 'started_at',
- 'Stop Time': 'ended_at',
- 'Start Station ID': 'start_station_id',
- 'End Station ID': 'end_station_id',
- 'Start Station Latitude': 'start_lat',
- 'Start Station Longitude': 'start_lng',
- 'End Station Latitude': 'end_lat',
- 'End Station Longitude': 'end_lng',
- })
-
- if 'rideable_type' not in df.columns:
- df['rideable_type'] = None
-
- df = df[[
- 'rideable_type', 'started_at', 'ended_at', 'start_station_id',
- 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng',
- ]]
- df = df.replace({np.nan: None})
-
- # Localize timestamps, convert to UTC, then convert to UNIX epoch to
- # make things speedier when querying in sqlite
- df['started_at'] = pd.to_datetime(df['started_at'])\
- .dt.tz_localize(TZ_UTC)\
- .astype(int) / 10**9
- df['ended_at'] = pd.to_datetime(df['ended_at'])\
- .dt.tz_localize(TZ_UTC)\
- .astype(int) / 10**9
-
- df.to_sql(
- name="historical_trips",
- con=self.db,
- if_exists='append',
- index=False,
- )
-
- def fetch_daily_volume(self, date: datetime.date) -> int:
- """
- Calculate the number of trips taken on a given day.
+ def write_hourly_volume_rollup(self, values: pd.DataFrame):
"""
- query = """
- SELECT
- date(started_at, 'unixepoch'),
- COUNT(*)
- FROM historical_trips
- WHERE
- started_at >= ? AND
- started_at < ?
+ Writes a daily volume rollup member to the table
"""
+ values['started_at'] = (values['started_at'].astype(int) / 10**9).astype(int)
- start_dt = self.__localize_date(date)
- end_dt = self.__localize_date(date + datetime.timedelta(days=1))
-
- result = pd.read_sql(query, self.db, params=(start_dt, end_dt))
- return result.iat[0, 1]
-
- def fetch_daily_volume_rollup(self) -> pd.DataFrame:
- """
- Returns a set of days that have an entry in the volume rollup table
- """
query = """
- SELECT
- date(date, 'unixepoch') AS date,
- trip_count
- FROM daily_volume_rollup
- """
- df = pd.read_sql(query, self.db)
- df['date'] = pd.to_datetime(df['date'])
-
- return df
-
- def write_daily_volume_rollup(self, values: list[tuple[datetime.date, int]]):
- """
- Writes a daily volume rollup member to the table
+ INSERT OR REPLACE INTO hourly_volume_rollup (
+ datetime, trip_count, calculated_at
+ ) VALUES (
+ :started_at, :trip_count, UNIXEPOCH()
+ )
"""
- to_insert = [(self.__localize_date(d), c) for d, c in values]
- df = pd.DataFrame(to_insert, columns=['date', 'trip_count'])
- df.to_sql(
- name="daily_volume_rollup",
- con=self.db,
- if_exists="append",
- index=False,
- )
+ cur = self.db.cursor()
+ print(values)
+ cur.executemany(query, values.to_dict('records'))
+ self.db.commit()
diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py
@@ -5,6 +5,7 @@ import io
import boto3
import pathlib
import pandas as pd
+import numpy as np
import botocore.client as bclient
from botocore import UNSIGNED
@@ -12,7 +13,7 @@ import forerad.utils as utils
log = utils.get_logger()
ARCHIVE_REGEX = re.compile("^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))")
-CACHE_DIR = pathlib.Path('.forerad-cache')
+CACHE_DIR = pathlib.Path(__file__).parent.parent.parent / pathlib.Path('.forerad-cache')
TRIP_BUCKET = 'tripdata'
if not CACHE_DIR.exists():
CACHE_DIR.mkdir()
@@ -68,8 +69,9 @@ class HistoricalTripArchive():
Returns a list of HistoricalTripArchive objects for all archives that have already
been downloaded
"""
- unfiltered = [cls.from_archive_path(p) for p in CACHE_DIR.glob('*')]
- return [a for a in unfiltered if a is not None]
+ archives = [cls.from_archive_path(p) for p in CACHE_DIR.glob('*')]
+ archives = [a for a in archives if a is not None]
+ return sorted(archives, key=lambda a: a.date)
def __fetch_cached_blob(self) -> io.BytesIO | None:
archive_path = CACHE_DIR / self.object_key
@@ -101,6 +103,7 @@ class HistoricalTripArchive():
blob = io.BytesIO(resp['Body'].read())
self.__store_blob(blob)
+ # Unzip the CSV into a dataframe
with zipfile.ZipFile(blob, 'r') as zip_archive:
file_list = zip_archive.namelist()
csv_name = self.csv_name
@@ -114,7 +117,56 @@ class HistoricalTripArchive():
csv_name = file_list[0]
with zip_archive.open(csv_name, 'r') as csv:
- return pd.read_csv(csv)
+ df = pd.read_csv(csv)
+
+ # Normalize the dataframe
+ if 'starttime' in df.columns:
+ # Transform v1 schema
+ df = df.rename(columns={
+ 'starttime': 'started_at',
+ 'stoptime': 'ended_at',
+ 'start station id': 'start_station_id',
+ 'end station id': 'end_station_id',
+ 'start station latitude': 'start_lat',
+ 'start station longitude': 'start_lng',
+ 'end station latitude': 'end_lat',
+ 'end station longitude': 'end_lng',
+ })
+ elif 'Start Time' in df.columns:
+ # This is a weird in-between state of v1 and v2 that starts in the
+ # 2016-10 dataset
+ df = df.rename(columns={
+ 'Start Time': 'started_at',
+ 'Stop Time': 'ended_at',
+ 'Start Station ID': 'start_station_id',
+ 'End Station ID': 'end_station_id',
+ 'Start Station Latitude': 'start_lat',
+ 'Start Station Longitude': 'start_lng',
+ 'End Station Latitude': 'end_lat',
+ 'End Station Longitude': 'end_lng',
+ })
+
+ if 'rideable_type' not in df.columns:
+ df['rideable_type'] = None
+
+ df = df[[
+ 'rideable_type', 'started_at', 'ended_at', 'start_station_id',
+ 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng',
+ ]]
+ df = df.replace({np.nan: None})
+
+ # Localize timestamps, convert to UTC, then convert to UNIX epoch to
+ # make things speedier when querying in sqlite
+ df['started_at'] = pd.to_datetime(df['started_at'])\
+ .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\
+ .dt.tz_convert(utils.TZ_UTC)
+ df['ended_at'] = pd.to_datetime(df['ended_at'])\
+ .dt.tz_localize(utils.TZ_NYC, ambiguous='NaT')\
+ .dt.tz_convert(utils.TZ_UTC)
+
+ df.index.name = 'tempid'
+
+ return df.dropna(subset=['started_at', 'ended_at'])
@property
def is_downloaded(self) -> bool:
diff --git a/forerad/utils.py b/forerad/utils.py
@@ -1,5 +1,10 @@
import re
import logging
+import datetime
+import pytz
+
+TZ_NYC = pytz.timezone('America/New_York')
+TZ_UTC = pytz.timezone('UTC')
def parse_month_str(month_str: str) -> tuple[int, int]:
"""
@@ -13,6 +18,13 @@ def parse_month_str(month_str: str) -> tuple[int, int]:
year, month = match.groups()
return (int(year), int(month))
+def next_month(year, month) -> datetime.date:
+ """
+ Given a year and month, calculate the first day of the next month
+ """
+ date = datetime.date(year, month, 1)
+ return (date + datetime.timedelta(days=32)).replace(day=1)
+
def get_logger():
logger = logging.getLogger('forerad')