forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

persistence.py (2435B)


      1 import sqlite3
      2 import pandas as pd
      3 import numpy as np
      4 import datetime
      5 import pathlib
      6 import pytz
      7 
      8 import forerad.utils as utils
      9 
     10 class SQLiteStore():
     11     def __init__(self):
     12         db_path = pathlib.Path(__file__).parent.parent / 'data.sqlite3'
     13         self.db = sqlite3.connect(str(db_path))
     14 
     15     def __localize_date(self, date: datetime.date) -> int:
     16         """
     17         Convert a date to a localized datetime (in UNIX epoch seconds) that can
     18         be used to query the started_at and ended_at columns.
     19         """
     20         dt = datetime.datetime.combine(date, datetime.datetime.min.time())
     21         localized = utils.TZ_NYC.localize(dt)\
     22             .astimezone(utils.TZ_UTC)
     23 
     24         return int(localized.strftime('%s'))
     25 
     26 
     27     def fetch_hourly_volume_rollup(self, start_date: datetime.date, end_date: datetime.date) -> pd.DataFrame:
     28         query = """
     29             SELECT
     30                 datetime(datetime, 'unixepoch') AS datetime,
     31                 brooklyn_trips,
     32                 queens_trips,
     33                 manhattan_trips,
     34                 bronx_trips,
     35                 staten_island_trips
     36             FROM hourly_volume_rollup
     37             WHERE
     38                 datetime >= ? AND
     39                 datetime <= ?
     40         """
     41 
     42         start_dt = self.__localize_date(start_date)
     43         end_dt = self.__localize_date(end_date)
     44 
     45         results = pd.read_sql(query, self.db, params=(start_dt, end_dt))
     46         results['datetime'] = pd.to_datetime(results['datetime'])\
     47             .dt.tz_localize(utils.TZ_UTC)\
     48             .dt.tz_convert(utils.TZ_NYC)
     49 
     50         return results.set_index('datetime')
     51 
     52 
     53     def write_hourly_volume_rollup(self, values: pd.DataFrame):
     54         """
     55         Writes a daily volume rollup member to the table
     56         """
     57         values['started_at'] = (values['started_at'].astype(int) / 10**9).astype(int)
     58 
     59         query = """
     60             INSERT OR REPLACE INTO hourly_volume_rollup (
     61                 datetime,
     62                 local_date,
     63                 brooklyn_trips,
     64                 manhattan_trips,
     65                 queens_trips,
     66                 bronx_trips,
     67                 staten_island_trips
     68             ) VALUES (
     69                 :started_at, :local_date, :brooklyn_trips, :manhattan_trips,
     70                 :queens_trips, :bronx_trips, :staten_island_trips
     71             )
     72         """
     73 
     74         cur = self.db.cursor()
     75         cur.executemany(query, values.to_dict('records'))
     76         self.db.commit()