commit fdc40120167bc7700d2c16cd76dc3acfb21eac56
parent fa9ffc6481e80341a33e2ba9ca3bc49b4ebb001b
Author: Steve Gattuso <steve@stevegattuso.me>
Date: Sun, 29 Oct 2023 21:37:53 +0100
listing and fetching archives works
Diffstat:
6 files changed, 100 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -1 +1,2 @@
.direnv
+__pycache__
diff --git a/forerad/__init__.py b/forerad/__init__.py
diff --git a/forerad/scrapers/__init__.py b/forerad/scrapers/__init__.py
diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py
@@ -0,0 +1,80 @@
+import re
+import collections
+import datetime
+import logging
+import zipfile
+import io
+import boto3
+import pandas as pd
+import botocore.client as bclient
+from botocore import UNSIGNED
+
+MonthlyArchive = collections.namedtuple('MonthlyArchive', [
+ 'object_key',
+ 'date',
+])
+
+
+TRIP_BUCKET = 'tripdata'
+
+
+def __get_s3_client():
+ config = bclient.Config(signature_version=UNSIGNED)
+ return boto3.client('s3', config=config)
+
+
+def __parse_object(obj: dict) -> MonthlyArchive | None:
+ """
+ Converts an S3 object dictionary into a MonthlyArchive record, returning
+ None if the object doesn't look like a monthly ride archive.
+ """
+ match = re.match(r"^([0-9]{4})([0-9]{2})-citibike-tripdata((.zip$)|(.csv.zip$))", obj['Key'])
+ if match is None:
+ logging.debug(f"Skipping object {obj['Key']}")
+ return None
+
+ groups = match.groups()
+
+ return MonthlyArchive(
+ object_key=obj['Key'],
+ date=datetime.date(int(groups[0]), int(groups[1]), 1)
+ )
+
+
+def fetch_archives() -> list[MonthlyArchive]:
+ """
+ Fetches a list of archive definitions from the Citibike S3 bucket
+ """
+ s3 = __get_s3_client()
+ resp = s3.list_objects_v2(Bucket=TRIP_BUCKET)
+
+ archives = [__parse_object(o) for o in resp['Contents']]
+ return [a for a in archives if a is not None]
+
+
+def fetch_archive(archive: MonthlyArchive) -> pd.DataFrame | None:
+ """
+ Fetches an archive from S3 and unzips the contents into a readable blob.
+ """
+ s3 = __get_s3_client()
+ resp = s3.get_object(Bucket=TRIP_BUCKET, Key=archive.object_key)
+
+ contents = io.BytesIO(resp['Body'].read())
+ with zipfile.ZipFile(contents, 'r') as zip_archive:
+ file_list = zip_archive.namelist()
+
+ csv_name = (
+ archive.object_key
+ .replace('.zip.csv', '.csv')
+ .replace('.zip', '.csv')
+ )
+
+ if csv_name not in file_list:
+ logging.error(f"Could not extract {archive.object_key}:")
+ logging.error(file_list)
+ return None
+
+ with zip_archive.open(csv_name, 'r') as csv:
+ ride_df = pd.read_csv(csv)
+
+ return ride_df
diff --git a/requirements.txt b/requirements.txt
@@ -0,0 +1,11 @@
+boto3==1.28.73
+botocore==1.31.73
+jmespath==1.0.1
+numpy==1.26.1
+pandas==2.1.2
+python-dateutil==2.8.2
+pytz==2023.3.post1
+s3transfer==0.7.0
+six==1.16.0
+tzdata==2023.3
+urllib3==2.0.7
diff --git a/test.py b/test.py
@@ -0,0 +1,8 @@
+import forerad.scrapers.historical as historical
+
+archives = historical.fetch_archives()
+print(f"Fetching {archives[0]}")
+trip_df = historical.fetch_archive(archives[0])
+
+print('Trip df:')
+print(trip_df)