persistence.py (2435B)
1 import sqlite3 2 import pandas as pd 3 import numpy as np 4 import datetime 5 import pathlib 6 import pytz 7 8 import forerad.utils as utils 9 10 class SQLiteStore(): 11 def __init__(self): 12 db_path = pathlib.Path(__file__).parent.parent / 'data.sqlite3' 13 self.db = sqlite3.connect(str(db_path)) 14 15 def __localize_date(self, date: datetime.date) -> int: 16 """ 17 Convert a date to a localized datetime (in UNIX epoch seconds) that can 18 be used to query the started_at and ended_at columns. 19 """ 20 dt = datetime.datetime.combine(date, datetime.datetime.min.time()) 21 localized = utils.TZ_NYC.localize(dt)\ 22 .astimezone(utils.TZ_UTC) 23 24 return int(localized.strftime('%s')) 25 26 27 def fetch_hourly_volume_rollup(self, start_date: datetime.date, end_date: datetime.date) -> pd.DataFrame: 28 query = """ 29 SELECT 30 datetime(datetime, 'unixepoch') AS datetime, 31 brooklyn_trips, 32 queens_trips, 33 manhattan_trips, 34 bronx_trips, 35 staten_island_trips 36 FROM hourly_volume_rollup 37 WHERE 38 datetime >= ? AND 39 datetime <= ? 40 """ 41 42 start_dt = self.__localize_date(start_date) 43 end_dt = self.__localize_date(end_date) 44 45 results = pd.read_sql(query, self.db, params=(start_dt, end_dt)) 46 results['datetime'] = pd.to_datetime(results['datetime'])\ 47 .dt.tz_localize(utils.TZ_UTC)\ 48 .dt.tz_convert(utils.TZ_NYC) 49 50 return results.set_index('datetime') 51 52 53 def write_hourly_volume_rollup(self, values: pd.DataFrame): 54 """ 55 Writes a daily volume rollup member to the table 56 """ 57 values['started_at'] = (values['started_at'].astype(int) / 10**9).astype(int) 58 59 query = """ 60 INSERT OR REPLACE INTO hourly_volume_rollup ( 61 datetime, 62 local_date, 63 brooklyn_trips, 64 manhattan_trips, 65 queens_trips, 66 bronx_trips, 67 staten_island_trips 68 ) VALUES ( 69 :started_at, :local_date, :brooklyn_trips, :manhattan_trips, 70 :queens_trips, :bronx_trips, :staten_island_trips 71 ) 72 """ 73 74 cur = self.db.cursor() 75 cur.executemany(query, values.to_dict('records')) 76 self.db.commit()