forerad

Utilities for collecting and analyzing with Citibike data in Python
Log | Files | Refs | README

commit 17d351b9bda21c0685f6b31797358d7660e98073
parent ee68047881ac7e16ca5051a7a90abbca64efd08d
Author: Steve Gattuso <steve@stevegattuso.me>
Date:   Mon,  6 Nov 2023 13:02:08 +0100

add schema report

Diffstat:
Mbin/scraper | 29++++++++++++++++++++++++++---
Mforerad/scrapers/historical.py | 33++++++++++++++++++++-------------
2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/bin/scraper b/bin/scraper @@ -15,7 +15,7 @@ store = persistence.SQLiteStore() def main__fetch(args: argparse.Namespace): - archives = scrape_historical.fetch_remote_archives() + archives = scrape_historical.HistoricalTripArchive.list_remote() if args.month is not None: archives = [a for a in archives if a.date.strftime('%Y-%m') == args.month] @@ -33,8 +33,29 @@ def main__fetch(args: argparse.Namespace): archive.fetch_df() +def main__schema_report(args: argparse.Namespace): + archives = scrape_historical.HistoricalTripArchive.list_cached() + + if args.month is not None: + archives = [a for a in archives if a.month_str == args.month] + + print('Archive schema report:') + for archive in archives: + df = archive.fetch_df(normalize=False) + if 'starttime' in df.columns: + print(f'- {archive.month_str}: v1') + elif 'Start Time' in df.columns: + print(f'- {archive.month_str}: v2') + elif 'started_at' in df.columns: + print(f'- {archive.month_str}: v3') + else: + print(f'- {archive.month_str}: ??') + + if args.month: + print(f"Columns: {', '.join(df.columns)}") + def main__list(): - archives = scrape_historical.fetch_remote_archives() + archives = scrape_historical.HistoricalTripArchive.list_remote() print('Available archives:\n') for archive in archives: @@ -46,12 +67,14 @@ if __name__ == '__main__': parser = argparse.ArgumentParser( description="Backfill historical trip data into your data store", ) - parser.add_argument('action', choices=['list', 'fetch']) + parser.add_argument('action', choices=['list', 'fetch', 'schema-report']) parser.add_argument('--month', help="The month to download, in YYYY-MM format") args = parser.parse_args() if args.action == 'fetch': main__fetch(args) + elif args.action == 'schema-report': + main__schema_report(args) elif args.action == 'list': main__list() diff --git a/forerad/scrapers/historical.py b/forerad/scrapers/historical.py @@ -64,6 +64,18 @@ class HistoricalTripArchive(): ) @classmethod + def list_remote(cls): + """ + Returns a list of HistoricalTripArchive objects that are available on + S3 but may or may not be cached locally. + """ + s3 = __get_s3_client() + resp = s3.list_objects_v2(Bucket=TRIP_BUCKET) + + archives = [HistoricalTripArchive.from_s3(o) for o in resp['Contents']] + return [a for a in archives if a is not None] + + @classmethod def list_cached(cls): """ Returns a list of HistoricalTripArchive objects for all archives that have already @@ -79,7 +91,6 @@ class HistoricalTripArchive(): return None with open(archive_path, 'rb') as f: - utils.logger.info(f"Loading {self.object_key} from cache") return io.BytesIO(f.read()) def __store_blob(self, blob: io.BytesIO): @@ -90,7 +101,7 @@ class HistoricalTripArchive(): blob.seek(0) - def fetch_df(self) -> pd.DataFrame: + def fetch_df(self, normalize=True) -> pd.DataFrame: """ Fetches a DataFrame of the archive. If the archive doesn't exist on the filesystem it will attempt to download it from S3. @@ -119,6 +130,9 @@ class HistoricalTripArchive(): with zip_archive.open(csv_name, 'r') as csv: df = pd.read_csv(csv) + if not normalize: + return df + # Normalize the dataframe if 'starttime' in df.columns: # Transform v1 schema @@ -175,6 +189,10 @@ class HistoricalTripArchive(): return (CACHE_DIR / self.object_key).exists() @property + def month_str(self) -> str: + return f'{self.date.year}-{self.date.month:02d}' + + @property def csv_name(self) -> str: if '.csv.zip' in self.object_key: return self.object_key.replace('.csv.zip', '.csv') @@ -183,14 +201,3 @@ class HistoricalTripArchive(): def __repr__(self): return f'<HistoricalTripArchive {self.date} {self.object_key} />' - - -def fetch_remote_archives() -> list[HistoricalTripArchive]: - """ - Fetches a list of archive definitions from the Citibike S3 bucket - """ - s3 = __get_s3_client() - resp = s3.list_objects_v2(Bucket=TRIP_BUCKET) - - archives = [HistoricalTripArchive.from_s3(o) for o in resp['Contents']] - return [a for a in archives if a is not None]