forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

commit c5bb7d5df26da533163797c3ca0808b867d34dce
parent fdc40120167bc7700d2c16cd76dc3acfb21eac56
Author: Steve Gattuso <steve@stevegattuso.me>
Date:   Sun, 29 Oct 2023 22:08:40 +0100

get s3 cache working to speed up feedback cycle

Diffstat:
M.gitignore | 1+
Mforerad/scrapers/historical.py | 74+++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Mrequirements.txt | 5+++++
Mtest.py | 5++++-
4 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,2 +1,3 @@ .direnv __pycache__ +.forerad-cache diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py @@ -1,19 +1,58 @@ import re -import collections import datetime import logging import zipfile import io import boto3 +import pathlib import pandas as pd import botocore.client as bclient from botocore import UNSIGNED -MonthlyArchive = collections.namedtuple('MonthlyArchive', [ - 'object_key', - 'date', -]) +class MonthlyArchive(): + object_key: str + date: datetime.date + def __init__(self, object_key, date): + self.object_key = object_key + self.date = date + + @property + def csv_name(self) -> str: + return ( + self.object_key + .replace('.zip.csv', '.csv') + .replace('.zip', '.csv') + ) + + +class ArchiveCache(): + def __init__(self): + self.cache_dir = pathlib.Path('.forerad-cache') + self.__ensure_cache() + + def __ensure_cache(self): + if not self.cache_dir.exists(): + self.cache_dir.mkdir() + logging.debug('Initializing .cache dir') + return None + + def get_archive(self, archive: MonthlyArchive): + archive_path = self.cache_dir / archive.object_key + if not archive_path.exists(): + return None + + with open(archive_path, 'rb') as f: + logging.info(f"Loading {archive.object_key} from cache") + return io.BytesIO(f.read()) + + def store_archive(self, archive: MonthlyArchive, blob): + archive_path = self.cache_dir / archive.object_key + with open(archive_path, 'wb') as f: + logging.info(f"Storing {archive.object_key} in cache") + f.write(blob.getbuffer()) + + blob.seek(0) TRIP_BUCKET = 'tripdata' @@ -56,25 +95,26 @@ def fetch_archive(archive: MonthlyArchive) -> pd.DataFrame | None: """ Fetches an archive from S3 and unzips the contents into a readable blob. """ - s3 = __get_s3_client() - resp = s3.get_object(Bucket=TRIP_BUCKET, Key=archive.object_key) + cache = ArchiveCache() - contents = io.BytesIO(resp['Body'].read()) - with zipfile.ZipFile(contents, 'r') as zip_archive: - file_list = zip_archive.namelist() + archive_blob = cache.get_archive(archive) + if archive_blob is None: + logging.info(f"Fetching {archive.csv_name} from S3") + s3 = __get_s3_client() + resp = s3.get_object(Bucket=TRIP_BUCKET, Key=archive.object_key) + archive_blob = io.BytesIO(resp['Body'].read()) - csv_name = ( - archive.object_key - .replace('.zip.csv', '.csv') - .replace('.zip', '.csv') - ) + cache.store_archive(archive, archive_blob) + + with zipfile.ZipFile(archive_blob, 'r') as zip_archive: + file_list = zip_archive.namelist() - if csv_name not in file_list: + if archive.csv_name not in file_list: logging.error(f"Could not extract {archive.object_key}:") logging.error(file_list) return None - with zip_archive.open(csv_name, 'r') as csv: + with zip_archive.open(archive.csv_name, 'r') as csv: ride_df = pd.read_csv(csv) return ride_df diff --git a/requirements.txt b/requirements.txt @@ -1,5 +1,6 @@ boto3==1.28.73 botocore==1.31.73 +importlib-metadata==6.8.0 jmespath==1.0.1 numpy==1.26.1 pandas==2.1.2 @@ -7,5 +8,9 @@ python-dateutil==2.8.2 pytz==2023.3.post1 s3transfer==0.7.0 six==1.16.0 +sqlparse==0.4.4 +tabulate==0.9.0 tzdata==2023.3 urllib3==2.0.7 +yoyo-migrations==8.2.0 +zipp==3.17.0 diff --git a/test.py b/test.py @@ -1,8 +1,11 @@ +import logging import forerad.scrapers.historical as historical +logging.basicConfig(level=20) + archives = historical.fetch_archives() print(f"Fetching {archives[0]}") trip_df = historical.fetch_archive(archives[0]) print('Trip df:') -print(trip_df) +print(trip_df.columns)