From 099973b00079719c759e61666e907336344030e3 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 5 Apr 2020 16:44:14 -0500 Subject: [PATCH 01/19] Progress Bar: initial implementation using progresbar2 package --- app.py | 35 ++++++++++++++++++++++++++++++----- constants.py | 3 +++ etl/fetcher.py | 4 ++++ etl/fetcher_local.py | 15 +++++++++++++-- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index fa4db94..5372ad0 100644 --- a/app.py +++ b/app.py @@ -3,7 +3,13 @@ ''' import os -from etl import command_line_args, extractor, fetcher, fetcher_local, loader, parser, transformer, utils +import sys +import logging.config +import constants +from etl import progress_bar, command_line_args, extractor, fetcher, fetcher_local, loader, parser, transformer, utils +import progressbar + +progressbar.streams.wrap_stderr() CSV = '.csv' # comma separated values DBF = '.dbf' # dbase @@ -18,16 +24,24 @@ SUPPORTED_FILE_EXT = [CSV, DBF, MDB, PRJ, SBN, SBX, SHP, SHX] if __name__ == '__main__': + # initialize logging + logging.config.fileConfig('data/logger/config.ini') + logger = logging.getLogger(__name__) + + # initialize progress bar + pbar = progress_bar.ProgressBar(100) + + # Parse Command line arguments commandLineArgs = command_line_args.getCommandLineArgs() # Fetcher if (commandLineArgs.local_sources): - print('using local data files', commandLineArgs.local_sources) - fetcher = fetcher_local.FetcherLocal() + logger.debug('Using local data files: %s', commandLineArgs.local_sources) + fetcher = fetcher_local.FetcherLocal(pbar,logger) filenames = commandLineArgs.local_sources responses = fetcher.fetch_all(filenames) else: - fetcher = fetcher.Fetcher() + fetcher = fetcher.Fetcher(pbar) src_yaml = utils.get_yaml('data/sources/sources.yml') responses = fetcher.fetch_all(src_yaml) @@ -35,9 +49,14 @@ parser = parser.Parser() for response in responses: try: + logger.debug('Parsing %s', response.name) + # set progress bar increment by passing in # of files to be parsed + pbar.set_increment(len(responses)) response.payload = parser.flatten(response, SUPPORTED_FILE_EXT) + # update progress bar + pbar.update() except Exception as err: - print(err) + logging.error(err) # Extractor extractor = extractor.Extractor() @@ -45,6 +64,9 @@ entity_dict = dict() entities = [] for response in responses: + # set progress bar increment by passing in # of files to be parsed + pbar.set_increment(len(responses)) + logging.debug('Extracting %s', response.name) for payload in response.payload: if utils.get_file_ext(payload.filename) == CSV: entities = extractor.get_csv_data(payload) @@ -58,6 +80,8 @@ entities = {} # Add to master entity list entity_dict.update(entities) + # update progress bar + pbar.update() # Transformer transform_tasks = utils.get_yaml('data/transform_tasks/transform_tasks.yml') @@ -67,5 +91,6 @@ # Loader db_yaml = utils.get_yaml('data/database/config.yml') loader = loader.Loader(db_yaml) + loader.connect() # TODO: insert, update tables using loader class diff --git a/constants.py b/constants.py index e69de29..cb539cb 100644 --- a/constants.py +++ b/constants.py @@ -0,0 +1,3 @@ + +# Declare global variables (callable across files) +TOTAL_STAGES = 5 diff --git a/etl/fetcher.py b/etl/fetcher.py index efc38be..a88d8ca 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -16,6 +16,10 @@ class Fetcher: + # Initializer / Instance Attributes + def __init__(self, pbar): + self.pbar = pbar + def fetch_all(self, src_yaml): ''' Returns a list of fetcher data objects as defined below. diff --git a/etl/fetcher_local.py b/etl/fetcher_local.py index 609687b..92b4fc9 100644 --- a/etl/fetcher_local.py +++ b/etl/fetcher_local.py @@ -3,19 +3,31 @@ from io import BytesIO import os from posixpath import basename +import constants +import logging # local testing replacement for Fetcher # load a subset of data from local files instead # of downloading class FetcherLocal: + # Initializer / Instance Attributes + def __init__(self, pbar,logger): + self.pbar = pbar + self.logger = logger + # self.logger = logging.getLogger(__name__) + def fetch_all(self, filenames): + # set progress bar increment by passing in # of files to be fetched + self.pbar.set_increment(len(filenames)) fetchedFiles = [] for filename in filenames: + self.logger.debug("Fetching local file: %s", filename) file = self.fetch(filename) fetchedFiles.append(file) - + # update progress bar + self.pbar.update() return fetchedFiles def fetch(self, filename): @@ -31,4 +43,3 @@ def fetch(self, filename): ) file.close() return fetchResponse - From dfc37c2eb1fc930e9dae6f468d7a426b710f5a89 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 5 Apr 2020 19:36:13 -0500 Subject: [PATCH 02/19] Progress Bar: refactor using enlighten package for stdout workaround --- app.py | 79 ++++--------------- constants.py | 3 - data/logger/config.ini | 23 ++++++ data/sources/sources.yml | 3 +- etl/constants.py | 12 +++ etl/extractor.py | 54 ++++++++++++- etl/fetcher.py | 16 +++- etl/fetcher_local.py | 16 ++-- etl/loader.py | 4 +- etl/parser.py | 23 ++++++ etl/transformer.py | 17 +++- .../vacant_table/vacant_table.py | 12 +-- 12 files changed, 172 insertions(+), 90 deletions(-) delete mode 100644 constants.py create mode 100644 data/logger/config.ini create mode 100644 etl/constants.py diff --git a/app.py b/app.py index 5372ad0..69bcfd1 100644 --- a/app.py +++ b/app.py @@ -5,92 +5,47 @@ import os import sys import logging.config -import constants -from etl import progress_bar, command_line_args, extractor, fetcher, fetcher_local, loader, parser, transformer, utils -import progressbar - -progressbar.streams.wrap_stderr() - -CSV = '.csv' # comma separated values -DBF = '.dbf' # dbase -MDB = '.mdb' # microsoft access database (jet, access, etc.) -PRJ = '.prj' # .shp support file -SBN = '.sbn' # .shp support file -SBX = '.sbx' # .shp support file -SHP = '.shp' # shapes -SHX = '.shx' # .shp support file +from etl.constants import * +from etl import command_line_args, extractor, fetcher, fetcher_local, loader, parser, transformer, utils +import enlighten ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) -SUPPORTED_FILE_EXT = [CSV, DBF, MDB, PRJ, SBN, SBX, SHP, SHX] if __name__ == '__main__': - # initialize logging - logging.config.fileConfig('data/logger/config.ini') - logger = logging.getLogger(__name__) - - # initialize progress bar - pbar = progress_bar.ProgressBar(100) - # Parse Command line arguments commandLineArgs = command_line_args.getCommandLineArgs() + # Setup logging + logging.config.fileConfig('data/logger/config.ini') + logger = logging.getLogger(__name__) + # Setup progress bar manager + pbar_manager = enlighten.get_manager() # Fetcher if (commandLineArgs.local_sources): - logger.debug('Using local data files: %s', commandLineArgs.local_sources) - fetcher = fetcher_local.FetcherLocal(pbar,logger) + logger.info("Using local data files: {}".format(' '.join(map(str, commandLineArgs.local_sources)))) + fetcher = fetcher_local.FetcherLocal(pbar_manager) filenames = commandLineArgs.local_sources responses = fetcher.fetch_all(filenames) else: - fetcher = fetcher.Fetcher(pbar) + fetcher = fetcher.Fetcher(pbar_manager) src_yaml = utils.get_yaml('data/sources/sources.yml') responses = fetcher.fetch_all(src_yaml) # Parser - parser = parser.Parser() - for response in responses: - try: - logger.debug('Parsing %s', response.name) - # set progress bar increment by passing in # of files to be parsed - pbar.set_increment(len(responses)) - response.payload = parser.flatten(response, SUPPORTED_FILE_EXT) - # update progress bar - pbar.update() - except Exception as err: - logging.error(err) + parser = parser.Parser(pbar_manager) + responses = parser.parse_all(responses) # Extractor - extractor = extractor.Extractor() - # Master entity list - entity_dict = dict() - entities = [] - for response in responses: - # set progress bar increment by passing in # of files to be parsed - pbar.set_increment(len(responses)) - logging.debug('Extracting %s', response.name) - for payload in response.payload: - if utils.get_file_ext(payload.filename) == CSV: - entities = extractor.get_csv_data(payload) - elif utils.get_file_ext(payload.filename) == MDB: - entities = extractor.get_mdb_data(payload) - elif utils.get_file_ext(payload.filename) == DBF: - entities = extractor.get_dbf_data(payload) - elif utils.get_file_ext(payload.filename) == SHP: - entities = extractor.get_shp_data(response, payload) - else: - entities = {} - # Add to master entity list - entity_dict.update(entities) - # update progress bar - pbar.update() + extractor = extractor.Extractor(pbar_manager) + entity_dict = extractor.extract_all(responses) # Transformer transform_tasks = utils.get_yaml('data/transform_tasks/transform_tasks.yml') - transformer = transformer.Transformer() + transformer = transformer.Transformer(pbar_manager) transformed = transformer.transform_all(entity_dict, transform_tasks) # Loader db_yaml = utils.get_yaml('data/database/config.yml') - loader = loader.Loader(db_yaml) - + loader = loader.Loader(db_yaml, pbar_manager) loader.connect() # TODO: insert, update tables using loader class diff --git a/constants.py b/constants.py deleted file mode 100644 index cb539cb..0000000 --- a/constants.py +++ /dev/null @@ -1,3 +0,0 @@ - -# Declare global variables (callable across files) -TOTAL_STAGES = 5 diff --git a/data/logger/config.ini b/data/logger/config.ini new file mode 100644 index 0000000..e53fa4a --- /dev/null +++ b/data/logger/config.ini @@ -0,0 +1,23 @@ +[loggers] +keys=root + +[handlers] +keys=consoleHandler + +[formatters] +keys=simpleFormatter + +[logger_root] +level=DEBUG +handlers=consoleHandler + +[handler_consoleHandler] +class=StreamHandler +level=DEBUG +disable_existing_loggers=False +formatter=simpleFormatter +args=(sys.stdout,) + +[formatter_simpleFormatter] +format=%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s +datefmt=%Y-%m-%d:%H:%M:%S diff --git a/data/sources/sources.yml b/data/sources/sources.yml index 91132ae..3db3b6a 100644 --- a/data/sources/sources.yml +++ b/data/sources/sources.yml @@ -7,7 +7,8 @@ prcl: par: url: https://www.stlouis-mo.gov/data/upload/data-files/par.zip -lra_public: +LRA_Iventory_Records: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=65 url: https://www.stlouis-mo.gov/data/upload/data-files/lra_public.zip bldginsp: diff --git a/etl/constants.py b/etl/constants.py new file mode 100644 index 0000000..7c48f60 --- /dev/null +++ b/etl/constants.py @@ -0,0 +1,12 @@ + +# Declare global variables (callable across files) +TOTAL_STAGES = 5 +CSV = '.csv' # comma separated values +DBF = '.dbf' # dbase +MDB = '.mdb' # microsoft access database (jet, access, etc.) +PRJ = '.prj' # .shp support file +SBN = '.sbn' # .shp support file +SBX = '.sbx' # .shp support file +SHP = '.shp' # shapes +SHX = '.shx' # .shp support file +SUPPORTED_FILE_EXT = [CSV, DBF, MDB, PRJ, SBN, SBX, SHP, SHX] diff --git a/etl/extractor.py b/etl/extractor.py index 1155637..1ba79b1 100644 --- a/etl/extractor.py +++ b/etl/extractor.py @@ -8,7 +8,10 @@ import shutil from dbfread import DBF as DBFobj from io import StringIO -from etl.entity import Entity +# from etl.entity import Entity +import logging +from etl import utils +from etl.constants import * class Extractor: @@ -18,8 +21,40 @@ class Extractor: ''' # Initializer / Instance Attributes - def __init__(self): - pass + def __init__(self, pbar_manager): + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) + + def extract_all(self, responses): + ''' + Returns a list of extracted data objects from responses + ''' + # Setup Extract stage progress bar + self.job_count = len(responses) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__) + + # Prepare nested dictionary to store extracted table lists + entity_dict = dict() + entities = dict() + for response in responses: + for payload in response.payload: + if utils.get_file_ext(payload.filename) == CSV: + entities = self.get_csv_data(payload) + elif utils.get_file_ext(payload.filename) == MDB: + entities = self.get_mdb_data(payload) + elif utils.get_file_ext(payload.filename) == DBF: + entities = self.get_dbf_data(payload) + elif utils.get_file_ext(payload.filename) == SHP: + entities = self.get_shp_data(response, payload) + else: + entities = {} + # Add to master entity list + entity_dict.update(entities) + # update progress bar + self.pbar.update() + # close progress bar + self.pbar.close() + return entity_dict # Get attributes that matches user-specified data type def get_attributes_by_data_type(self, db_schema, data_type): @@ -45,6 +80,7 @@ def get_csv_data(self, payload): Arguments: payload -- payload object (str,binary) ''' + self.logger.debug('Extracting file: %s', payload.filename) # Convert ByteIO to string content = str(payload.data.getvalue(),'utf-8') # Feed string as StringIO into pandas read_csv function() @@ -73,13 +109,21 @@ def get_mdb_data(self, payload): # Declare entity dict entity_dict = dict() + + # Get list of table from database + table_list = pandas_access.list_tables(payload.filename) + # Setup MDB job progress bar + child_pbar = self.pbar_manager.counter(total=len(table_list), desc=' get_mdb_data') # Iterate through each table in database - for tbl in pandas_access.list_tables(payload.filename): + for tbl in table_list: + self.logger.debug('Extracting table: %s from file: %s', tbl, payload.filename) # Issue: Default pandas integer type is not nullable - null values in integer column causes read error # Workaround: Read integer as Int64 (pandas nullable integer type in pandas) dtype_input = {attribute:'Int64' for attribute in integer_attributes[tbl]} df = pandas_access.read_table(payload.filename, tbl, dtype = dtype_input) entity_dict.update({tbl:df}) + # update progress bar + child_pbar.update() return entity_dict # Extract .dbf data @@ -90,6 +134,7 @@ def get_dbf_data(self, payload): Arguments: payload -- payload object (str,binary) ''' + self.logger.debug('Extracting file: %s', payload.filename) # TODO: find a way to directly pass byteio into DBFobj without writing to disk # Write bytes to disk open(payload.filename, 'wb').write(payload.data.getvalue()) @@ -111,6 +156,7 @@ def get_shp_data(self, archive, shapefile): archive -- fetcher response from the archive containing the shape file and supporting files (FetcherResponse) shapefile -- the shape file from the archive; looks like the "payload" argument in other extractors ''' + self.logger.debug('Extracting \'file\': %s', shapefile.filename) SCRATCH_DIR = 'scratch' try: # .shp requires multiple supporting files; save all files from archive to disk diff --git a/etl/fetcher.py b/etl/fetcher.py index a88d8ca..1adcc95 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -12,22 +12,32 @@ from urllib.request import urlopen from urllib.parse import urlparse from urllib.error import URLError +import logging class Fetcher: # Initializer / Instance Attributes - def __init__(self, pbar): - self.pbar = pbar + def __init__(self, pbar_manager): + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) def fetch_all(self, src_yaml): ''' Returns a list of fetcher data objects as defined below. ''' + # Setup Fetch stage progress bar + job_count = len(src_yaml.keys()) + self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + data = [] for key in src_yaml.keys(): + self.logger.debug("Fetching data: %s", key) data.append(self.fetch(key, src_yaml[key]['url'])) - + # update progress bar + self.pbar.update() + # Close progress bar + self.pbar.close() return data def fetch(self, name, url): diff --git a/etl/fetcher_local.py b/etl/fetcher_local.py index 92b4fc9..79f0da1 100644 --- a/etl/fetcher_local.py +++ b/etl/fetcher_local.py @@ -3,7 +3,6 @@ from io import BytesIO import os from posixpath import basename -import constants import logging # local testing replacement for Fetcher @@ -12,14 +11,15 @@ class FetcherLocal: # Initializer / Instance Attributes - def __init__(self, pbar,logger): - self.pbar = pbar - self.logger = logger - # self.logger = logging.getLogger(__name__) + def __init__(self, pbar_manager): + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) def fetch_all(self, filenames): - # set progress bar increment by passing in # of files to be fetched - self.pbar.set_increment(len(filenames)) + # Setup Fetch stage progress bar + job_count = len(filenames) + self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + fetchedFiles = [] for filename in filenames: @@ -28,6 +28,8 @@ def fetch_all(self, filenames): fetchedFiles.append(file) # update progress bar self.pbar.update() + # Close progress bar + self.pbar.close() return fetchedFiles def fetch(self, filename): diff --git a/etl/loader.py b/etl/loader.py index b9e6e54..b9face8 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -12,8 +12,10 @@ class Loader: _config = None # Initializer / Instance Attributes - def __init__(self,config_yaml): + def __init__(self, config_yaml, pbar_manager): self._config = config_yaml + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) # Get credentials from yaml config def get_credentials(self, config_yaml, match_key): diff --git a/etl/parser.py b/etl/parser.py index 65878ef..7756710 100644 --- a/etl/parser.py +++ b/etl/parser.py @@ -1,8 +1,31 @@ from etl.utils import decompress, get_file_ext import zipfile +import logging +from etl.constants import * class Parser: + # Initializer / Instance Attributes + def __init__(self, pbar_manager): + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) + + def parse_all(self, responses): + # Setup Extract stage progress bar + job_count = len(responses) + self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + for response in responses: + try: + self.logger.debug('Parsing file: %s', response.name) + response.payload = self.flatten(response, SUPPORTED_FILE_EXT) + except Exception as err: + self.logger.error(err) + # update progress bar + self.pbar.update() + # close progress bar + self.pbar.close() + return responses + def flatten(self, response, extensions=None): for payload in response.payload: if zipfile.is_zipfile(payload.data): diff --git a/etl/transformer.py b/etl/transformer.py index 7f4342e..219359e 100644 --- a/etl/transformer.py +++ b/etl/transformer.py @@ -1,7 +1,10 @@ from .transformer_tasks.vacant_table.vacant_table import vacant_table +import logging class Transformer: - def __init__(self): + def __init__(self, pbar_manager): + self.pbar_manager = pbar_manager + self.logger = logging.getLogger(__name__) self.transform_tasks_to_transform_fns = { 'vacant_table': vacant_table } @@ -23,6 +26,10 @@ def transform_all(self, extracted, transform_task_list): transformed['task_name'] = ''' + # Setup Extract stage progress bar + job_count = len(transform_task_list) + self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + transformed = {} for task_name in transform_task_list: @@ -31,9 +38,11 @@ def transform_all(self, extracted, transform_task_list): 'not_found' ) if (transform_task_fn == 'not_found'): - print('unknown transform task ' + task_name) + self.logger.error('unknown transform task ' + task_name) continue transformed[task_name] = transform_task_fn(extracted) - + # update progress bar + self.pbar.update() + # Close progress bar + self.pbar.close() return transformed - \ No newline at end of file diff --git a/etl/transformer_tasks/vacant_table/vacant_table.py b/etl/transformer_tasks/vacant_table/vacant_table.py index eaed2de..381e24b 100644 --- a/etl/transformer_tasks/vacant_table/vacant_table.py +++ b/etl/transformer_tasks/vacant_table/vacant_table.py @@ -1,12 +1,13 @@ from .map_parcel_data_to_vacant_table import map_parcel_data_to_vacant_table, vacant_table_fields from .merge_parcel_data import merge_parcel_data +import logging def keep_only_select_columns_in_df(df, columnsToKeep): df.drop(df.columns.difference(columnsToKeep), axis=1, inplace=True) def vacant_table(df): ''' - Transform extractor output into a dataframe that resembles the + Transform extractor output into a dataframe that resembles the "vacant" table Arguments: @@ -17,7 +18,8 @@ def vacant_table(df): the vacant app table. ''' - print('starting transformer vacant_table') + elf.logger = logging.getLogger(__name__) + self.logger.debug('starting transformer vacant_table') # merge parcel data into a single dataframe with a "one row = one parcel" format merged_parcel_data = merge_parcel_data(df) @@ -26,10 +28,10 @@ def vacant_table(df): map_parcel_data_to_vacant_table(merged_parcel_data) # remove fields we don't need - print('prune unneeded fields') + self.logger.debug('prune unneeded fields') keep_only_select_columns_in_df(merged_parcel_data, vacant_table_fields.keys()) - - print(merged_parcel_data) + + self.logger.debug(merged_parcel_data) merged_parcel_data.to_csv('transform_vacant_table.csv', index=False) From 20b77ae139f0c194c1e48670ff9c9568efe488a2 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Mon, 6 Apr 2020 22:05:36 -0500 Subject: [PATCH 03/19] Progress Bar: implemented detailed progress increments for Extractor --- data/sources/sources.yml | 18 +++++++++----- etl/extractor.py | 24 ++++++++++++------- etl/fetcher.py | 6 ++--- etl/fetcher_local.py | 6 ++--- etl/fetcher_response.py | 15 ++++++++++++ etl/loader.py | 1 + etl/parser.py | 6 ++--- etl/transformer.py | 5 ++-- .../map_parcel_data_to_vacant_table.py | 5 ++-- .../vacant_table/merge_parcel_data.py | 13 +++++----- .../vacant_table/vacant_table.py | 7 +++--- requirements.txt | 4 ++++ 12 files changed, 72 insertions(+), 38 deletions(-) diff --git a/data/sources/sources.yml b/data/sources/sources.yml index 3db3b6a..c401b89 100644 --- a/data/sources/sources.yml +++ b/data/sources/sources.yml @@ -1,21 +1,27 @@ -prcl_shape: +ESRI_Parcels_Shapefile: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=84 url: https://www.stlouis-mo.gov/data/upload/data-files/prcl_shape.zip -prcl: +Parcels_Key: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=83 url: https://www.stlouis-mo.gov/data/upload/data-files/prcl.zip -par: +Parcel_Data: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=85 url: https://www.stlouis-mo.gov/data/upload/data-files/par.zip LRA_Iventory_Records: info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=65 url: https://www.stlouis-mo.gov/data/upload/data-files/lra_public.zip -bldginsp: +Building_Inspections: + info: https://www.stlouis-mo.gov/data/datasets/dataset.cfm?id=11 url: https://www.stlouis-mo.gov/data/upload/data-files/bldginsp.zip -prmbdo: +Building_Permits: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=3 url: https://www.stlouis-mo.gov/data/upload/data-files/prmbdo.zip -forestry_maintenance_properties: +Forestry_Property_Maintenance_Data: + Info: https://www.stlouis-mo.gov/data/datasets/dataset.cfm?id=64 url: https://www.stlouis-mo.gov/data/upload/data-files/forestry-maintenance-properties.csv diff --git a/etl/extractor.py b/etl/extractor.py index 1ba79b1..a8dc4e8 100644 --- a/etl/extractor.py +++ b/etl/extractor.py @@ -23,6 +23,7 @@ class Extractor: # Initializer / Instance Attributes def __init__(self, pbar_manager): self.pbar_manager = pbar_manager + self.job_count = 0 self.logger = logging.getLogger(__name__) def extract_all(self, responses): @@ -30,14 +31,16 @@ def extract_all(self, responses): Returns a list of extracted data objects from responses ''' # Setup Extract stage progress bar - self.job_count = len(responses) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__) + self.job_count = sum(map(lambda response: response.payload_count(), responses)) + # self.logger.debug("self.job_count: %s",self.job_count) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') # Prepare nested dictionary to store extracted table lists entity_dict = dict() entities = dict() for response in responses: for payload in response.payload: + # Extract payload if utils.get_file_ext(payload.filename) == CSV: entities = self.get_csv_data(payload) elif utils.get_file_ext(payload.filename) == MDB: @@ -80,7 +83,7 @@ def get_csv_data(self, payload): Arguments: payload -- payload object (str,binary) ''' - self.logger.debug('Extracting file: %s', payload.filename) + self.logger.debug('Extracting file: %s...', payload.filename) # Convert ByteIO to string content = str(payload.data.getvalue(),'utf-8') # Feed string as StringIO into pandas read_csv function() @@ -112,18 +115,21 @@ def get_mdb_data(self, payload): # Get list of table from database table_list = pandas_access.list_tables(payload.filename) - # Setup MDB job progress bar - child_pbar = self.pbar_manager.counter(total=len(table_list), desc=' get_mdb_data') + + # Update progress bar job count + self.job_count += len(table_list) + self.pbar.total = self.job_count + # Iterate through each table in database for tbl in table_list: - self.logger.debug('Extracting table: %s from file: %s', tbl, payload.filename) + self.logger.debug('Extracting table: \'%s\' from file: %s...', tbl, payload.filename) # Issue: Default pandas integer type is not nullable - null values in integer column causes read error # Workaround: Read integer as Int64 (pandas nullable integer type in pandas) dtype_input = {attribute:'Int64' for attribute in integer_attributes[tbl]} df = pandas_access.read_table(payload.filename, tbl, dtype = dtype_input) entity_dict.update({tbl:df}) # update progress bar - child_pbar.update() + self.pbar.update() return entity_dict # Extract .dbf data @@ -134,7 +140,7 @@ def get_dbf_data(self, payload): Arguments: payload -- payload object (str,binary) ''' - self.logger.debug('Extracting file: %s', payload.filename) + self.logger.debug('Extracting file: %s...', payload.filename) # TODO: find a way to directly pass byteio into DBFobj without writing to disk # Write bytes to disk open(payload.filename, 'wb').write(payload.data.getvalue()) @@ -156,7 +162,7 @@ def get_shp_data(self, archive, shapefile): archive -- fetcher response from the archive containing the shape file and supporting files (FetcherResponse) shapefile -- the shape file from the archive; looks like the "payload" argument in other extractors ''' - self.logger.debug('Extracting \'file\': %s', shapefile.filename) + self.logger.debug('Extracting file: %s...', shapefile.filename) SCRATCH_DIR = 'scratch' try: # .shp requires multiple supporting files; save all files from archive to disk diff --git a/etl/fetcher.py b/etl/fetcher.py index 1adcc95..a132a1f 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -27,12 +27,12 @@ def fetch_all(self, src_yaml): Returns a list of fetcher data objects as defined below. ''' # Setup Fetch stage progress bar - job_count = len(src_yaml.keys()) - self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + self.job_count = len(src_yaml.keys()) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') data = [] for key in src_yaml.keys(): - self.logger.debug("Fetching data: %s", key) + self.logger.debug("Fetching data: %s (%s)...", key, src_yaml[key]['url'].rsplit('/', 1)[-1]) data.append(self.fetch(key, src_yaml[key]['url'])) # update progress bar self.pbar.update() diff --git a/etl/fetcher_local.py b/etl/fetcher_local.py index 79f0da1..795e87a 100644 --- a/etl/fetcher_local.py +++ b/etl/fetcher_local.py @@ -17,13 +17,13 @@ def __init__(self, pbar_manager): def fetch_all(self, filenames): # Setup Fetch stage progress bar - job_count = len(filenames) - self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + self.job_count = len(filenames) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') fetchedFiles = [] for filename in filenames: - self.logger.debug("Fetching local file: %s", filename) + self.logger.debug("Fetching local file: %s...", filename) file = self.fetch(filename) fetchedFiles.append(file) # update progress bar diff --git a/etl/fetcher_response.py b/etl/fetcher_response.py index 9861270..1f99eea 100644 --- a/etl/fetcher_response.py +++ b/etl/fetcher_response.py @@ -37,3 +37,18 @@ def to_dict(self): 'source': self.source, 'error': self.error } + + def payload_count(self): + ''' + returns number of items in self.payload + ''' + # check if payload is defined + if self.payload is None: + return 0 + + # check if payload is a list or a dict + if isinstance(self.payload, list) or isinstance(self.payload, dict): + return len(self.payload) + + # if neither, assume object is payload object + return 1 diff --git a/etl/loader.py b/etl/loader.py index b9face8..8f8be47 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -1,5 +1,6 @@ import yaml import sqlalchemy +import logging class Loader: ''' diff --git a/etl/parser.py b/etl/parser.py index 7756710..4992667 100644 --- a/etl/parser.py +++ b/etl/parser.py @@ -12,11 +12,11 @@ def __init__(self, pbar_manager): def parse_all(self, responses): # Setup Extract stage progress bar - job_count = len(responses) - self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + self.job_count = len(responses) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') for response in responses: try: - self.logger.debug('Parsing file: %s', response.name) + self.logger.debug('Parsing file: %s...', response.name) response.payload = self.flatten(response, SUPPORTED_FILE_EXT) except Exception as err: self.logger.error(err) diff --git a/etl/transformer.py b/etl/transformer.py index 219359e..68b847f 100644 --- a/etl/transformer.py +++ b/etl/transformer.py @@ -27,8 +27,9 @@ def transform_all(self, extracted, transform_task_list): ''' # Setup Extract stage progress bar - job_count = len(transform_task_list) - self.pbar = self.pbar_manager.counter(total=job_count, desc=__name__) + self.job_count = len(transform_task_list) + self.logger.debug("transform job_count: %s", self.job_count) + self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__) transformed = {} diff --git a/etl/transformer_tasks/vacant_table/map_parcel_data_to_vacant_table.py b/etl/transformer_tasks/vacant_table/map_parcel_data_to_vacant_table.py index 50d22f7..4640121 100644 --- a/etl/transformer_tasks/vacant_table/map_parcel_data_to_vacant_table.py +++ b/etl/transformer_tasks/vacant_table/map_parcel_data_to_vacant_table.py @@ -1,4 +1,5 @@ from . import fields +import logging ''' vacant_table_fields is a dictionary representing every field in the "vacant" table @@ -51,12 +52,12 @@ def map_parcel_data_to_vacant_table(merged_parcel_data): merged_parcel_data -- output of merging Prcl with other parcel related dataframes ''' - print('map data to columns') + logging.debug('map data to columns') for column, source in vacant_table_fields.items(): if column == source: continue elif source == 'TODO': - print('-- TODO implement field %s' % column) + logging.debug('-- TODO implement field %s' % column) elif type(source) == str: merged_parcel_data[column] = merged_parcel_data[source] else: diff --git a/etl/transformer_tasks/vacant_table/merge_parcel_data.py b/etl/transformer_tasks/vacant_table/merge_parcel_data.py index f90ffb7..1b40872 100644 --- a/etl/transformer_tasks/vacant_table/merge_parcel_data.py +++ b/etl/transformer_tasks/vacant_table/merge_parcel_data.py @@ -1,4 +1,5 @@ from .parcel_id import parcel_id +import logging def merge_parcel_data(df): ''' @@ -11,19 +12,19 @@ def merge_parcel_data(df): ''' # add ParcelId as a merge index - print('add ParcelId to bldgcom, bldgres') + logging.debug('add ParcelId to bldgcom, bldgres') add_parcel_id_column_to_df(df['BldgCom']) add_parcel_id_column_to_df(df['BldgRes']) - print('merge tables with prcl') - print('-BldgCom') + logging.debug('merge tables with prcl') + logging.debug('-BldgCom') prclWithBldgCom = df['Prcl'].merge( right=df['BldgCom'], how='left', on='ParcelId' ) - print('-BldgRes') + logging.debug('-BldgRes') fullyMergedPrcl = prclWithBldgCom.merge( right=df['BldgRes'], how='left', @@ -33,7 +34,7 @@ def merge_parcel_data(df): # convert pk to int so we can merge with other data sources fullyMergedPrcl['Handle'] = fullyMergedPrcl['Handle'].astype(int) - print('-par.dbf') + logging.debug('-par.dbf') parDbf = df['par.dbf'].copy(deep=True) # cast par.dbf pk to match prcl pk type parDbf['HANDLE'] = parDbf['HANDLE'].astype(int) @@ -44,7 +45,7 @@ def merge_parcel_data(df): right_on='HANDLE' ) - print('-prcl.shp') + logging.debug('-prcl.shp') prclShp = df['prcl.shp'].copy(deep=True) # again we need to change the pk type to match prcl prclShp['HANDLE'] = prclShp['HANDLE'].astype(int) diff --git a/etl/transformer_tasks/vacant_table/vacant_table.py b/etl/transformer_tasks/vacant_table/vacant_table.py index 381e24b..c3ee800 100644 --- a/etl/transformer_tasks/vacant_table/vacant_table.py +++ b/etl/transformer_tasks/vacant_table/vacant_table.py @@ -18,8 +18,7 @@ def vacant_table(df): the vacant app table. ''' - elf.logger = logging.getLogger(__name__) - self.logger.debug('starting transformer vacant_table') + logging.debug('starting transformer vacant_table') # merge parcel data into a single dataframe with a "one row = one parcel" format merged_parcel_data = merge_parcel_data(df) @@ -28,10 +27,10 @@ def vacant_table(df): map_parcel_data_to_vacant_table(merged_parcel_data) # remove fields we don't need - self.logger.debug('prune unneeded fields') + logging.debug('prune unneeded fields') keep_only_select_columns_in_df(merged_parcel_data, vacant_table_fields.keys()) - self.logger.debug(merged_parcel_data) + logging.debug(merged_parcel_data) merged_parcel_data.to_csv('transform_vacant_table.csv', index=False) diff --git a/requirements.txt b/requirements.txt index 15a658a..cac565c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ astroid==2.3.3 attrs==19.3.0 autopep8==1.4.4 +blessed==1.17.4 Click==7.0 click-plugins==1.1.1 cligj==0.5.0 dbfread==2.0.7 +enlighten==1.5.1 Fiona==1.8.13 geopandas==0.6.2 isort==4.3.21 @@ -19,10 +21,12 @@ pylint==2.4.4 PyMySQL==0.9.3 pyproj==2.4.2.post1 python-dateutil==2.8.1 +python-utils==2.4.0 pytz==2019.3 PyYAML==5.3 Shapely==1.7.0 six==1.13.0 SQLAlchemy==1.3.12 typed-ast==1.4.0 +wcwidth==0.1.9 wrapt==1.11.2 From 740154185f88c755dbd2887f610dcb0c13cd01d9 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Tue, 7 Apr 2020 00:42:21 -0500 Subject: [PATCH 04/19] Progress Bar: Fixed comment --- etl/extractor.py | 2 +- etl/fetcher.py | 2 +- etl/fetcher_local.py | 2 +- etl/parser.py | 2 +- etl/transformer.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/etl/extractor.py b/etl/extractor.py index a8dc4e8..73581a0 100644 --- a/etl/extractor.py +++ b/etl/extractor.py @@ -30,7 +30,7 @@ def extract_all(self, responses): ''' Returns a list of extracted data objects from responses ''' - # Setup Extract stage progress bar + # Setup progress bar self.job_count = sum(map(lambda response: response.payload_count(), responses)) # self.logger.debug("self.job_count: %s",self.job_count) self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') diff --git a/etl/fetcher.py b/etl/fetcher.py index a132a1f..174f380 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -26,7 +26,7 @@ def fetch_all(self, src_yaml): ''' Returns a list of fetcher data objects as defined below. ''' - # Setup Fetch stage progress bar + # Setup progress bar self.job_count = len(src_yaml.keys()) self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') diff --git a/etl/fetcher_local.py b/etl/fetcher_local.py index 795e87a..7d092f7 100644 --- a/etl/fetcher_local.py +++ b/etl/fetcher_local.py @@ -16,7 +16,7 @@ def __init__(self, pbar_manager): self.logger = logging.getLogger(__name__) def fetch_all(self, filenames): - # Setup Fetch stage progress bar + # Setup progress bar self.job_count = len(filenames) self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') diff --git a/etl/parser.py b/etl/parser.py index 4992667..13807af 100644 --- a/etl/parser.py +++ b/etl/parser.py @@ -11,7 +11,7 @@ def __init__(self, pbar_manager): self.logger = logging.getLogger(__name__) def parse_all(self, responses): - # Setup Extract stage progress bar + # Setup progress bar self.job_count = len(responses) self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') for response in responses: diff --git a/etl/transformer.py b/etl/transformer.py index 68b847f..df889e3 100644 --- a/etl/transformer.py +++ b/etl/transformer.py @@ -26,7 +26,7 @@ def transform_all(self, extracted, transform_task_list): transformed['task_name'] = ''' - # Setup Extract stage progress bar + # Setup progress bar self.job_count = len(transform_task_list) self.logger.debug("transform job_count: %s", self.job_count) self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__) From a4dbb77d50e7c15ff1c6058b782b4ab73646da1c Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sat, 11 Apr 2020 15:15:35 -0500 Subject: [PATCH 05/19] changes to ensure merge is working --- app.py | 3 ++- etl/extractor.py | 15 +++++++++------ etl/loader.py | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index 61a4f3e..c3c8077 100644 --- a/app.py +++ b/app.py @@ -18,7 +18,8 @@ logging.config.fileConfig('data/logger/config.ini') logger = logging.getLogger(__name__) # Setup progress bar manager - pbar_manager = enlighten.get_manager() + pbar_manager = enlighten.get_manager() + # notify user if the app will be using test or prod db if (commandLineArgs.db == 'prod'): logger.info('Using production database...') diff --git a/etl/extractor.py b/etl/extractor.py index 98f25a5..5203793 100644 --- a/etl/extractor.py +++ b/etl/extractor.py @@ -115,15 +115,15 @@ def get_mdb_data(self, payload): # Declare entity dict entity_dict = dict() - # Get list of table from database - table_list = pandas_access.list_tables(payload.filename) + # Get list of table from database + table_list = pandas_access.list_tables(payload.filename) - # Update progress bar job count - self.job_count += len(table_list) - self.pbar.total = self.job_count + # Update progress bar job count + self.job_count += len(table_list) + self.pbar.total = self.job_count # Iterate through each table in database - for tbl in table_list: + for tbl in table_list: self.logger.debug('Extracting table: \'%s\' from file: %s...', tbl, payload.filename) # Issue: Default pandas integer type is not nullable - null values in integer column causes read error # Workaround: Read integer as Int64 (pandas nullable integer type in pandas) @@ -134,6 +134,7 @@ def get_mdb_data(self, payload): self.pbar.update() return entity_dict finally: + self.logger.debug('Removing intermediate file: %s...', payload.filename) utils.silentremove(payload.filename) # Extract .dbf data @@ -158,6 +159,7 @@ def get_dbf_data(self, payload): # Return Entity object return {payload.filename: dataframe} finally: + self.logger.debug('Removing intermediate file: %s...', payload.filename) utils.silentremove(payload.filename) # Extract .shp data @@ -183,4 +185,5 @@ def get_shp_data(self, archive, shapefile): return {shapefile.filename: dataframe} finally: + self.logger.debug('Removing intermediate directory: %s...', SCRATCH_DIR) shutil.rmtree(SCRATCH_DIR) diff --git a/etl/loader.py b/etl/loader.py index a9423a9..01189bc 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -67,7 +67,7 @@ def insert(self, tablename, table_df, chunk_size = 1000): ''' # If table doesn't exist, Create. if not self._engine.dialect.has_table(self._engine, tablename): - self.logger.debug("Inserting table ", tablename, " in ", self.credentials['db_name'], "...") + self.logger.debug("Inserting table %s into database %s", tablename, self.credentials['db_name'], "...") table_df.iloc[:0].to_sql(tablename, self._engine, if_exists='fail') # insert new table, drop old table if exists table_df.to_sql(name=tablename, con = self._engine, if_exists = 'replace', chunksize = chunk_size) From 0e5e83781a85f5d0892b69813b71a2c4fdaee7fd Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sat, 11 Apr 2020 15:21:24 -0500 Subject: [PATCH 06/19] loader.py: fix debug message argument error --- etl/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/loader.py b/etl/loader.py index 01189bc..f1c3515 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -67,7 +67,7 @@ def insert(self, tablename, table_df, chunk_size = 1000): ''' # If table doesn't exist, Create. if not self._engine.dialect.has_table(self._engine, tablename): - self.logger.debug("Inserting table %s into database %s", tablename, self.credentials['db_name'], "...") + self.logger.debug("Inserting table %s into database %s...", tablename, self.credentials['db_name']) table_df.iloc[:0].to_sql(tablename, self._engine, if_exists='fail') # insert new table, drop old table if exists table_df.to_sql(name=tablename, con = self._engine, if_exists = 'replace', chunksize = chunk_size) From 18aa332b7cc27170b3f1e06d6683644d59049d52 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sat, 11 Apr 2020 17:18:26 -0500 Subject: [PATCH 07/19] Progress bar: change to singleton implementation --- app.py | 23 ++++++------- etl/extractor.py | 7 ++-- etl/fetcher.py | 7 ++-- etl/fetcher_local.py | 7 ++-- etl/loader.py | 5 +-- etl/parser.py | 7 ++-- etl/progress_bar_manager.py | 65 +++++++++++++++++++++++++++++++++++++ etl/transformer.py | 7 ++-- 8 files changed, 100 insertions(+), 28 deletions(-) create mode 100644 etl/progress_bar_manager.py diff --git a/app.py b/app.py index c3c8077..c6efc64 100644 --- a/app.py +++ b/app.py @@ -6,8 +6,9 @@ import sys import logging.config from etl.constants import * -from etl import command_line_args, extractor, fetcher, fetcher_local, loader, parser, transformer, utils -import enlighten +from etl import command_line_args, extractor, fetcher, fetcher_local, loader, \ +parser, transformer, utils +from etl.progress_bar_manager import ProgressBarManager ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -17,9 +18,9 @@ # Setup logging logging.config.fileConfig('data/logger/config.ini') logger = logging.getLogger(__name__) - # Setup progress bar manager - pbar_manager = enlighten.get_manager() - + # Setup progress bar manager to keep track of multiple progress bars + pbar_manager = ProgressBarManager() + # notify user if the app will be using test or prod db if (commandLineArgs.db == 'prod'): logger.info('Using production database...') @@ -33,30 +34,30 @@ # Fetcher if (commandLineArgs.local_sources): logger.info("Using local data files: {}".format(' '.join(map(str, commandLineArgs.local_sources)))) - fetcher = fetcher_local.FetcherLocal(pbar_manager) + fetcher = fetcher_local.FetcherLocal() filenames = commandLineArgs.local_sources responses = fetcher.fetch_all(filenames) else: - fetcher = fetcher.Fetcher(pbar_manager) + fetcher = fetcher.Fetcher() src_yaml = utils.get_yaml('data/sources/sources.yml') responses = fetcher.fetch_all(src_yaml) # Parser - parser = parser.Parser(pbar_manager) + parser = parser.Parser() responses = parser.parse_all(responses) # Extractor - extractor = extractor.Extractor(pbar_manager) + extractor = extractor.Extractor() entity_dict = extractor.extract_all(responses) # Transformer transform_tasks = utils.get_yaml('data/transform_tasks/transform_tasks.yml') - transformer = transformer.Transformer(pbar_manager) + transformer = transformer.Transformer() transformed_dict = transformer.transform_all(entity_dict, transform_tasks) # Loader # read loader config - loader = loader.Loader(db_yaml, pbar_manager) + loader = loader.Loader(db_yaml) # connect to database loader.connect() for tablename, transformed_df in transformed_dict.items(): diff --git a/etl/extractor.py b/etl/extractor.py index 5203793..40465e6 100644 --- a/etl/extractor.py +++ b/etl/extractor.py @@ -13,6 +13,7 @@ import logging from etl import utils from etl.constants import * +from etl.progress_bar_manager import ProgressBarManager class Extractor: @@ -22,8 +23,8 @@ class Extractor: ''' # Initializer / Instance Attributes - def __init__(self, pbar_manager): - self.pbar_manager = pbar_manager + def __init__(self): + self.pbar_manager = ProgressBarManager() self.job_count = 0 self.logger = logging.getLogger(__name__) @@ -34,7 +35,7 @@ def extract_all(self, responses): # Setup progress bar self.job_count = sum(map(lambda response: response.payload_count(), responses)) # self.logger.debug("self.job_count: %s",self.job_count) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'files') # Prepare nested dictionary to store extracted table lists entity_dict = dict() diff --git a/etl/fetcher.py b/etl/fetcher.py index 174f380..237b6f2 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -13,13 +13,14 @@ from urllib.parse import urlparse from urllib.error import URLError import logging +from etl.progress_bar_manager import ProgressBarManager class Fetcher: # Initializer / Instance Attributes - def __init__(self, pbar_manager): - self.pbar_manager = pbar_manager + def __init__(self): + self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) def fetch_all(self, src_yaml): @@ -28,7 +29,7 @@ def fetch_all(self, src_yaml): ''' # Setup progress bar self.job_count = len(src_yaml.keys()) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'files') data = [] for key in src_yaml.keys(): diff --git a/etl/fetcher_local.py b/etl/fetcher_local.py index 7d092f7..b0517b1 100644 --- a/etl/fetcher_local.py +++ b/etl/fetcher_local.py @@ -4,6 +4,7 @@ import os from posixpath import basename import logging +from etl.progress_bar_manager import ProgressBarManager # local testing replacement for Fetcher # load a subset of data from local files instead @@ -11,14 +12,14 @@ class FetcherLocal: # Initializer / Instance Attributes - def __init__(self, pbar_manager): - self.pbar_manager = pbar_manager + def __init__(self): + self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) def fetch_all(self, filenames): # Setup progress bar self.job_count = len(filenames) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'files') fetchedFiles = [] diff --git a/etl/loader.py b/etl/loader.py index f1c3515..34b6022 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -3,6 +3,7 @@ from etl.utils import xstr from io import StringIO import logging +from etl.progress_bar_manager import ProgressBarManager class Loader: ''' @@ -16,10 +17,10 @@ class Loader: _metadata = None # Initializer / Instance Attributes - def __init__(self, config_yaml, pbar_manager): + def __init__(self, config_yaml): # Get credentials from YAML self.credentials = self.get_credentials(config_yaml,'database_credentials') - self.pbar_manager = pbar_manager + self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) # Get credentials from yaml config diff --git a/etl/parser.py b/etl/parser.py index 13807af..61c3144 100644 --- a/etl/parser.py +++ b/etl/parser.py @@ -2,18 +2,19 @@ import zipfile import logging from etl.constants import * +from etl.progress_bar_manager import ProgressBarManager class Parser: # Initializer / Instance Attributes - def __init__(self, pbar_manager): - self.pbar_manager = pbar_manager + def __init__(self): + self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) def parse_all(self, responses): # Setup progress bar self.job_count = len(responses) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__, unit='files') + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'files') for response in responses: try: self.logger.debug('Parsing file: %s...', response.name) diff --git a/etl/progress_bar_manager.py b/etl/progress_bar_manager.py new file mode 100644 index 0000000..7dd00c7 --- /dev/null +++ b/etl/progress_bar_manager.py @@ -0,0 +1,65 @@ +import logging +import enlighten + + + +# class ProgressBarManager: +# __instance = None +# @staticmethod +# def getInstance(): +# """ Static access method. """ +# if ProgressBarManager.__instance == None: +# logging.debug('Delay creation of ProgressBarManager object...') +# ProgressBarManager() +# return ProgressBarManager.__instance +# +# def __init__(self): +# """ Virtually private constructor. """ +# if ProgressBarManager.__instance != None: +# raise Exception("This class is a singleton!") +# else: +# logging.debug('Creating ProgressBarManager object...') +# ProgressBarManager.__instance = enlighten.get_manager() + +class ProgressBarManager(object): + _instance = None + + class __ProgressBarManager: + """ private class """ + def __init__(self): + """ private constructor """ + self._manager = enlighten.get_manager() + + def add_pbar(self, job_count, name, unit): + pbar = self._manager.counter(total=job_count, desc=name, unit=unit) + return pbar + + # def __str__(self): + # logging.debug('Calling private __str__') + # return self._manager + + def __new__(cls): + """ + Arguments: + Creates and returns ProgressBar object if it doesn't exist; + Returns ProgressBar object if it exists from previous initialization. + note: __new__ called at instances creation before __init__ + """ + if not ProgressBarManager._instance: + ProgressBarManager._instance = ProgressBarManager.__ProgressBarManager() + logging.debug('Creating ProgressBarManager object...') + else: + logging.debug('Using existing ProgressBarManager object...') + return ProgressBarManager._instance + + + # def update(self): + # self.pbar.update() + + def __getattr__(self, name): + logging.debug('Calling __getattr__') + return getattr(self._instance, name) + + # def __setattr__(self, name): + # logging.debug('Calling __setattr__') + # return setattr(self.instance, name) diff --git a/etl/transformer.py b/etl/transformer.py index df889e3..cdeb97a 100644 --- a/etl/transformer.py +++ b/etl/transformer.py @@ -1,9 +1,10 @@ from .transformer_tasks.vacant_table.vacant_table import vacant_table import logging +from etl.progress_bar_manager import ProgressBarManager class Transformer: - def __init__(self, pbar_manager): - self.pbar_manager = pbar_manager + def __init__(self): + self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) self.transform_tasks_to_transform_fns = { 'vacant_table': vacant_table @@ -29,7 +30,7 @@ def transform_all(self, extracted, transform_task_list): # Setup progress bar self.job_count = len(transform_task_list) self.logger.debug("transform job_count: %s", self.job_count) - self.pbar = self.pbar_manager.counter(total=self.job_count, desc=__name__) + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'files') transformed = {} From 62ea3f3479389e6f3d8933bd3ec0bdcf508c6e5e Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sat, 11 Apr 2020 18:43:24 -0500 Subject: [PATCH 08/19] Progress bar manager: added function to get child progress bars --- app.py | 3 -- etl/progress_bar_manager.py | 62 +++++++++++++------------------------ 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/app.py b/app.py index c6efc64..e98fd5c 100644 --- a/app.py +++ b/app.py @@ -8,7 +8,6 @@ from etl.constants import * from etl import command_line_args, extractor, fetcher, fetcher_local, loader, \ parser, transformer, utils -from etl.progress_bar_manager import ProgressBarManager ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -18,8 +17,6 @@ # Setup logging logging.config.fileConfig('data/logger/config.ini') logger = logging.getLogger(__name__) - # Setup progress bar manager to keep track of multiple progress bars - pbar_manager = ProgressBarManager() # notify user if the app will be using test or prod db if (commandLineArgs.db == 'prod'): diff --git a/etl/progress_bar_manager.py b/etl/progress_bar_manager.py index 7dd00c7..7ce3af7 100644 --- a/etl/progress_bar_manager.py +++ b/etl/progress_bar_manager.py @@ -2,64 +2,46 @@ import enlighten - -# class ProgressBarManager: -# __instance = None -# @staticmethod -# def getInstance(): -# """ Static access method. """ -# if ProgressBarManager.__instance == None: -# logging.debug('Delay creation of ProgressBarManager object...') -# ProgressBarManager() -# return ProgressBarManager.__instance -# -# def __init__(self): -# """ Virtually private constructor. """ -# if ProgressBarManager.__instance != None: -# raise Exception("This class is a singleton!") -# else: -# logging.debug('Creating ProgressBarManager object...') -# ProgressBarManager.__instance = enlighten.get_manager() - class ProgressBarManager(object): _instance = None class __ProgressBarManager: """ private class """ + _pbar_dict = dict() + def __init__(self): """ private constructor """ self._manager = enlighten.get_manager() + def __str__(self): + return f'a {self._manager} , {self._pbar_dict}' + + def __repr__(self): + return f'{self.__class__.__name__}('f'{self._manager!r}, {self._pbar_dict!r})' + def add_pbar(self, job_count, name, unit): + """ add progress bar to manager """ pbar = self._manager.counter(total=job_count, desc=name, unit=unit) + self._pbar_dict[name] = pbar return pbar - # def __str__(self): - # logging.debug('Calling private __str__') - # return self._manager + def get_pbar(self, name): + """ get progress bar to manager using name""" + if name not in self._pbar_dict: + logging.debug('ProgressBarManager cannot find progress bar with the name %s', name) + return None + return self._pbar_dict[name] def __new__(cls): """ - Arguments: - Creates and returns ProgressBar object if it doesn't exist; - Returns ProgressBar object if it exists from previous initialization. - note: __new__ called at instances creation before __init__ + Creates and returns ProgressBar object not initialized; + Returns ProgressBar object if initialized previously. """ if not ProgressBarManager._instance: ProgressBarManager._instance = ProgressBarManager.__ProgressBarManager() - logging.debug('Creating ProgressBarManager object...') + logging.debug('Creating new ProgressBarManager instance (%s)', + hex(id(ProgressBarManager._instance))) else: - logging.debug('Using existing ProgressBarManager object...') + logging.debug('Using existing ProgressBarManager instance (%s)', + hex(id(ProgressBarManager._instance))) return ProgressBarManager._instance - - - # def update(self): - # self.pbar.update() - - def __getattr__(self, name): - logging.debug('Calling __getattr__') - return getattr(self._instance, name) - - # def __setattr__(self, name): - # logging.debug('Calling __setattr__') - # return setattr(self.instance, name) From bc86195b56ae61021a56901ac9c6543ddfb2e1a1 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sat, 11 Apr 2020 19:10:36 -0500 Subject: [PATCH 09/19] loader.py: added load_all() function --- app.py | 6 +----- etl/loader.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index e98fd5c..3164aa4 100644 --- a/app.py +++ b/app.py @@ -53,9 +53,5 @@ transformed_dict = transformer.transform_all(entity_dict, transform_tasks) # Loader - # read loader config loader = loader.Loader(db_yaml) - # connect to database - loader.connect() - for tablename, transformed_df in transformed_dict.items(): - loader.insert(tablename, transformed_df) + loader.load_all(transformed_dict) diff --git a/etl/loader.py b/etl/loader.py index 34b6022..eb94d9a 100644 --- a/etl/loader.py +++ b/etl/loader.py @@ -18,10 +18,23 @@ class Loader: # Initializer / Instance Attributes def __init__(self, config_yaml): - # Get credentials from YAML - self.credentials = self.get_credentials(config_yaml,'database_credentials') self.pbar_manager = ProgressBarManager() self.logger = logging.getLogger(__name__) + # Get credentials from YAML + self.credentials = self.get_credentials(config_yaml,'database_credentials') + # connect to database + self.connect() + + def load_all(self, transformed_dict): + # Setup progress bar + self.job_count = len(transformed_dict) + self.pbar = self.pbar_manager.add_pbar(self.job_count, __name__, 'tables') + + # Load tables + for tablename, transformed_df in transformed_dict.items(): + self.insert(tablename, transformed_df) + # update progress bar + self.pbar.update() # Get credentials from yaml config def get_credentials(self, config_yaml, match_key): From e7accfd4a7bdbf1f1b2e25e63f44fe1ab1355edf Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:31:26 -0500 Subject: [PATCH 10/19] test_fetcher.py: script to run fetcher standalone --- etl/fetcher.py | 1 - requirements.txt | 8 ++++ tests/test_fetcher.py | 95 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 tests/test_fetcher.py diff --git a/etl/fetcher.py b/etl/fetcher.py index 237b6f2..dca944f 100755 --- a/etl/fetcher.py +++ b/etl/fetcher.py @@ -4,7 +4,6 @@ import ssl import sys import yaml -from app import ROOT_DIR from etl.fetcher_response import FetcherResponse from etl.payload_data import PayloadData from etl.utils import get_file_name_from_uri diff --git a/requirements.txt b/requirements.txt index cac565c..76e83c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,17 +9,24 @@ dbfread==2.0.7 enlighten==1.5.1 Fiona==1.8.13 geopandas==0.6.2 +importlib-metadata==1.6.0 isort==4.3.21 lazy-object-proxy==1.4.3 mccabe==0.6.1 +more-itertools==8.2.0 munch==2.5.0 numpy==1.18.1 +packaging==20.3 pandas==0.25.3 pandas-access==0.0.1 +pluggy==0.13.1 +py==1.8.1 pycodestyle==2.5.0 pylint==2.4.4 PyMySQL==0.9.3 +pyparsing==2.4.7 pyproj==2.4.2.post1 +pytest==5.4.1 python-dateutil==2.8.1 python-utils==2.4.0 pytz==2019.3 @@ -30,3 +37,4 @@ SQLAlchemy==1.3.12 typed-ast==1.4.0 wcwidth==0.1.9 wrapt==1.11.2 +zipp==3.1.0 diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py new file mode 100644 index 0000000..3344f5f --- /dev/null +++ b/tests/test_fetcher.py @@ -0,0 +1,95 @@ +''' +test_fetcher.py +Script to test 'etl/fetcher.py' +''' +import os +import sys +import logging.config +import pytest +# Add project path to python sys.path +dirname = os.path.dirname(os.path.abspath(__file__)) +proj_dir = os.path.join(dirname, '..') +sys.path.append(proj_dir) +from etl import utils +from etl.fetcher_response import FetcherResponse +from etl.fetcher import Fetcher + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Fetcher(): + ''' + Integration Test + ''' + # Specify source yaml to use + src_yaml_path = 'data/sources/test_sources.yml' + # Run fetcher + responses = run_fetcher(src_yaml_path) + # Validate output + # assert False + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Fetcher_class(): + ''' + Test Fetcher class instantiation + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch(): + ''' + Test fetch() function in Fetcher class + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch_all(): + ''' + Test fetch_all() function in Fetcher class + ''' + pass + + +def run_fetcher(src_yaml_path): + ''' + Run Fetcher in the same way as 'app.py' + ''' + fetcher = Fetcher() + src_yaml = utils.get_yaml(src_yaml_path) + responses = fetcher.fetch_all(src_yaml) + return responses + + +def setup_logging(): + ''' + Setup logger from config.ini + ''' + # Setup logging + logging_config = os.path.join(dirname, '../data/logger/config.ini') + logging.config.fileConfig(logging_config) + logger = logging.getLogger(__name__) + return logger + + +if __name__ == '__main__': + ''' + execute only if run as a script + ''' + # Set up logger + logger = setup_logging() + + # Parse Command Args for source yaml to use + if len(sys.argv) == 1: + # if not specified, use test_sources.yml + src_yaml_path = 'data/sources/test_sources.yml' + else: + # use yaml specified in command args + src_yaml_path = sys.argv[1] + logger.info("Using source yaml: %s", src_yaml_path) + + # Run Fetcher + logger.info("Running Fetcher standalone...") + responses = run_fetcher(src_yaml_path) + logger.debug(responses) From 274cb136a8051787e471719d7a7e77f2c37ee7ff Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:33:04 -0500 Subject: [PATCH 11/19] test_parser.py, test_extractor.py: run parser and extractor standalone --- etl/command_line_args.py | 8 +-- tests/test_extractor.py | 101 ++++++++++++++++++++++++++++++++++++++ tests/test_parser.py | 103 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+), 3 deletions(-) create mode 100644 tests/test_extractor.py create mode 100644 tests/test_parser.py diff --git a/etl/command_line_args.py b/etl/command_line_args.py index db9c973..71b7b9b 100644 --- a/etl/command_line_args.py +++ b/etl/command_line_args.py @@ -1,8 +1,10 @@ import argparse -def getCommandLineArgs(): +def getCommandLineArgs(local_source=True, db=True): parser = argparse.ArgumentParser() - parser.add_argument('--db', nargs='?', type=str, choices=['dev','prod'], default='dev', help='dev: use local database; prod: use production database') - parser.add_argument('--local-sources', nargs='+', type=str, help='local data files to use in place of internet sources.') + if db: + parser.add_argument('--db', nargs='?', type=str, choices=['dev','prod'], default='dev', help='dev: use local database; prod: use production database') + if local_source: + parser.add_argument('--local-sources', nargs='+', type=str, help='local data files to use in place of internet sources.') args = parser.parse_args() return args diff --git a/tests/test_extractor.py b/tests/test_extractor.py new file mode 100644 index 0000000..6927eeb --- /dev/null +++ b/tests/test_extractor.py @@ -0,0 +1,101 @@ +''' +test_extractor.py +Script to test 'etl/extractor.py' +''' +import os +import sys +import logging.config +import pytest +# Add project path to python sys.path +dirname = os.path.dirname(os.path.abspath(__file__)) +proj_dir = os.path.join(dirname, '..') +sys.path.append(proj_dir) +from tests.test_parser import create_mock_fetcher_responses, run_parser +from etl import utils, command_line_args +from etl.extractor import Extractor + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Extractor(): + ''' + Integration Test + ''' + # Validate output + assert False + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Extractor_class(): + ''' + Test Extractor class instantiation + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch(): + ''' + Test fetch() function in Extractor class + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch_all(): + ''' + Test fetch_all() function in Extractor class + ''' + pass + + +def run_extractor(src_yaml_path): + ''' + Run Extractor in the same way as 'app.py' + ''' + extractor = Extractor() + src_yaml = utils.get_yaml(src_yaml_path) + responses = extractor.fetch_all(src_yaml) + return responses + + +def setup_logging(): + ''' + Setup logger from config.ini + ''' + # Setup logging + logging_config = os.path.join(dirname, '../data/logger/config.ini') + logging.config.fileConfig(logging_config) + logger = logging.getLogger(__name__) + return logger + +if __name__ == '__main__': + ''' + execute only if run as a script + ''' + # Set up logger + logger = setup_logging() + + # Prase command line arguments, only parse local source args + commandLineArgs = command_line_args.getCommandLineArgs(db=False) + logger.info("Using local data files: {}".format( + ' '.join(map(str, commandLineArgs.local_sources)))) + + # Parse command line args for filenames + filenames = commandLineArgs.local_sources + # Create mock fetched responses + mock_fetcher_responses = create_mock_fetcher_responses(filenames) + + # Run Parser + mock_parsed_responses = run_parser(mock_fetcher_responses) + + # Run Extractor + logger.info("Running Extractor standalone...") + extractor = Extractor() + entity_dict = extractor.extract_all(mock_parsed_responses) + + # List tables and their attributes + for key, df in entity_dict.items(): + # filename of dataframe + filename = 'src/'+key+'.csv' + logger.info("Saving table %s to %s", key, filename) + utils.to_csv(df, filename) diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..46c5991 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,103 @@ +''' +test_parser.py +Script to test 'etl/parser.py' +''' +import os +import sys +import logging.config +import pytest +# Add project path to python sys.path +dirname = os.path.dirname(os.path.abspath(__file__)) +proj_dir = os.path.join(dirname, '..') +sys.path.append(proj_dir) +from etl import utils, command_line_args +from etl.fetcher_local import FetcherLocal +from etl.parser import Parser + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Parser(): + ''' + Integration Test + ''' + # Specify filenames + filenames = [proj_dir+'/src/prcl_shape.zip'] + # Create mock fetched responses + mock_fetcher_responses = create_mock_fetcher_responses(filenames) + # Run Parser + responses = run_parser(mock_fetcher_responses) + # Validate output + assert False + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Parser_class(): + ''' + Test Parser class instantiation + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch(): + ''' + Test fetch() function in Parser class + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch_all(): + ''' + Test fetch_all() function in Parser class + ''' + pass + + +def run_parser(responses): + ''' + Run Parser in the same way as 'app.py' + ''' + parser = Parser() + parsed_responses = parser.parse_all(responses) + return parsed_responses + + +def create_mock_fetcher_responses(filenames): + # Use FetcherLocal to create mock responses + fetcher = FetcherLocal() + mock_fetcher_response = fetcher.fetch_all(filenames) + return mock_fetcher_response + + +def setup_logging(): + ''' + Setup logger from config.ini + ''' + # Setup logging + logging_config = os.path.join(dirname, '../data/logger/config.ini') + logging.config.fileConfig(logging_config) + logger = logging.getLogger(__name__) + return logger + + +if __name__ == '__main__': + ''' + execute only if run as a script + ''' + # Set up logger + logger = setup_logging() + + # Parse command line args for filenames + commandLineArgs = command_line_args.getCommandLineArgs(db=False) + logger.info("Using local data files: {}".format( + ' '.join(map(str, commandLineArgs.local_sources)))) + filenames = commandLineArgs.local_sources + + # Create mock fetched responses + mock_fetcher_responses = create_mock_fetcher_responses(filenames) + + # Run Parser + logger.info("Running Parser standalone...") + responses = run_parser(mock_fetcher_responses) + logger.debug(responses) From 2d03e9b63711f96389cda68f0271f9c09b3d6bc2 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:34:24 -0500 Subject: [PATCH 12/19] app.py: changed import calls such that library and variable name don't clash --- app.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index 3164aa4..caac06a 100644 --- a/app.py +++ b/app.py @@ -6,8 +6,13 @@ import sys import logging.config from etl.constants import * -from etl import command_line_args, extractor, fetcher, fetcher_local, loader, \ -parser, transformer, utils +from etl import command_line_args, utils +from etl.fetcher import Fetcher +from etl.fetcher_local import FetcherLocal +from etl.extractor import Extractor +from etl.parser import Parser +from etl.transformer import Transformer +from etl.loader import Loader ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -31,27 +36,27 @@ # Fetcher if (commandLineArgs.local_sources): logger.info("Using local data files: {}".format(' '.join(map(str, commandLineArgs.local_sources)))) - fetcher = fetcher_local.FetcherLocal() + fetcher = FetcherLocal() filenames = commandLineArgs.local_sources responses = fetcher.fetch_all(filenames) else: - fetcher = fetcher.Fetcher() + fetcher = Fetcher() src_yaml = utils.get_yaml('data/sources/sources.yml') responses = fetcher.fetch_all(src_yaml) # Parser - parser = parser.Parser() + parser = Parser() responses = parser.parse_all(responses) # Extractor - extractor = extractor.Extractor() + extractor = Extractor() entity_dict = extractor.extract_all(responses) # Transformer transform_tasks = utils.get_yaml('data/transform_tasks/transform_tasks.yml') - transformer = transformer.Transformer() + transformer = Transformer() transformed_dict = transformer.transform_all(entity_dict, transform_tasks) # Loader - loader = loader.Loader(db_yaml) + loader = Loader(db_yaml) loader.load_all(transformed_dict) From 23ec6dcc8b2331a339dd84debbc6d090bd1c704f Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:36:28 -0500 Subject: [PATCH 13/19] test_transformer.py: run transformer standalone, added utils functions to help import and export csv preserving datatype metadata --- etl/utils.py | 21 +++++++++ tests/test_transformer.py | 94 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 tests/test_transformer.py diff --git a/etl/utils.py b/etl/utils.py index 28a1e37..d239922 100644 --- a/etl/utils.py +++ b/etl/utils.py @@ -2,6 +2,7 @@ import errno import yaml import zipfile +import pandas as pd from etl.payload_data import PayloadData from io import BytesIO from posixpath import basename @@ -64,3 +65,23 @@ def xstr(s): if s is None: return '' return str(s) + +# Export to csv perserving datatype +def to_csv(df, filename): + # Prepend dtypes to the top of df + df.loc[-1] = df.dtypes + df.index = df.index + 1 + df.sort_index(inplace=True) + # Then save it to a csv + df.to_csv(filename, index=False) + +# Import from csv preserving datatype +def read_csv(filename): + # Read types first line of csv + dtypes = pd.read_csv(filename, nrows=1).iloc[0].to_dict() + # Replace unrecognized dtype with 'object' + for attribute,dtype in dtypes.items(): + if str(dtype) == 'geometry': + dtypes[attribute] = 'object' + # Read the rest of the lines with the types from above + return pd.read_csv(filename, dtype=dtypes, skiprows=[1]) diff --git a/tests/test_transformer.py b/tests/test_transformer.py new file mode 100644 index 0000000..976dfb6 --- /dev/null +++ b/tests/test_transformer.py @@ -0,0 +1,94 @@ +''' +test_transformer.py +Script to test 'etl/transformer.py' +''' +import os +import sys +import logging.config +import pytest +# Add project path to python sys.path +dirname = os.path.dirname(os.path.abspath(__file__)) +proj_dir = os.path.join(dirname, '..') +sys.path.append(proj_dir) +from etl import utils, command_line_args +from etl.transformer import Transformer + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Transformer(): + ''' + Integration Test + ''' + assert False + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Transformer_class(): + ''' + Test Transformer class instantiation + ''' + pass + + +@pytest.mark.skip(reason="test not yet implemented") +def test_transform_all(): + ''' + Test transform_all() function in Transformer class + ''' + pass + + +def run_transformer(transform_yaml_path, extracted_dict): + ''' + Run Transformer in the same way as 'app.py' + ''' + transform_tasks = utils.get_yaml(transform_yaml_path) + transformer = Transformer() + transformed_dict = transformer.transform_all(extracted_dict, transform_tasks) + return transformed_dict + +def create_mock_extractor_dict(filenames, entry_limit=100): + # Read in csv files as DataFrame + entity_dict = dict() + for filename in filenames: + tablename = os.path.splitext(os.path.basename(filename))[0] + entity_dict[tablename]=utils.read_csv(filename) + return entity_dict + + +def setup_logging(): + ''' + Setup logger from config.ini + ''' + # Setup logging + logging_config = os.path.join(dirname, '../data/logger/config.ini') + logging.config.fileConfig(logging_config) + logger = logging.getLogger(__name__) + return logger + +if __name__ == '__main__': + ''' + execute only if run as a script + ''' + # Set up logger + logger = setup_logging() + + # Parse command line args for filenames + commandLineArgs = command_line_args.getCommandLineArgs(db=False) + logger.info("Using local data files: {}".format( + ' '.join(map(str, commandLineArgs.local_sources)))) + filenames = commandLineArgs.local_sources + + # Specify transform task yaml to use + transform_yaml_path = 'data/transform_tasks/transform_tasks.yml' + + # Create mock post-extraction dataframe dictionary + logger.info("Importing files into dataframes...") + mock_extracted_dict = create_mock_extractor_dict(filenames) + # logger.debug(mock_extracted_dict) + + # Run transformer + logger.info("Running Transformer standalone...") + transformed_dict = run_transformer(transform_yaml_path, mock_extracted_dict) + for tablename, df in transformed_dict.items(): + utils.to_csv(df, 'src/'+tablename+'.csv') From 07f6dd08e300ab6f6d94553ec6aeed38408aa5e9 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:37:17 -0500 Subject: [PATCH 14/19] vacant_table.py: changed to_csv to custom utils.to_csv to preserve datatype metadata --- README.md | 37 +++++++++++++++++++ .../vacant_table/vacant_table.py | 3 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 30cdc72..1e66247 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,44 @@ python3 ./app.py --db prod ``` :warning: Example 3. will not work if you don't have the database admin credentials. For more details, [Go to Running with Production Database](#running-with-production-database). +#### Running individual stages +To run individual stages (i.e. Fetcher only, Transformer only) without running the entire application, use the following commands: +1. Run `Fetcher` only: +Run with default `test_sources.yml`: +```bash +python3 tests/test_fetcher.py +``` +To run with specific source YAML, run the following command replacing last argument with path to custom YAML: +```bash +python3 tests/test_fetcher.py ./data/sources/sources.yml +``` + +2. Run `Parser` only: +Use --local-sources to specify local files to parse: +```bash +python3 tests/test_parser.py --local-sources ./src/prcl.mdb ./src/par.dbf ./src/prcl_shape.zip +``` + +3. Run `Extractor` only: +```bash +python3 tests/test_extractor.py --local-sources ./src/prcl.mdb ./src/par.dbf ./src/prcl_shape.zip +``` +4. Run `Transformer` only: +```bash +python3 tests/test_transformer.py --local-sources src/BldgCom.csv src/BldgRes.csv src/par.dbf.csv src/prcl.shp.csv src/Prcl.csv +``` + +5. Run `Loader` only: +```bash +python3 tests/test_loader.py --local-sources ./src/vacant_table.csv +``` + +#### Running unit tests +To run unit tests, run the following command from project root directory: +```bash +pytest +``` #### Deactivating Virtual Environment diff --git a/etl/transformer_tasks/vacant_table/vacant_table.py b/etl/transformer_tasks/vacant_table/vacant_table.py index c3ee800..756fca2 100644 --- a/etl/transformer_tasks/vacant_table/vacant_table.py +++ b/etl/transformer_tasks/vacant_table/vacant_table.py @@ -1,6 +1,7 @@ from .map_parcel_data_to_vacant_table import map_parcel_data_to_vacant_table, vacant_table_fields from .merge_parcel_data import merge_parcel_data import logging +from etl import utils def keep_only_select_columns_in_df(df, columnsToKeep): df.drop(df.columns.difference(columnsToKeep), axis=1, inplace=True) @@ -32,6 +33,6 @@ def vacant_table(df): logging.debug(merged_parcel_data) - merged_parcel_data.to_csv('transform_vacant_table.csv', index=False) + utils.to_csv(merged_parcel_data, 'transform_vacant_table.csv') return merged_parcel_data From b7e0627e9c91f0ac4a393e53cf2ba28d20f841ee Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:37:41 -0500 Subject: [PATCH 15/19] test_loader: run loader standalone --- tests/test_loader.py | 89 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/test_loader.py diff --git a/tests/test_loader.py b/tests/test_loader.py new file mode 100644 index 0000000..8ef2abd --- /dev/null +++ b/tests/test_loader.py @@ -0,0 +1,89 @@ +''' +test_loader.py +Script to test 'etl/loader.py' +''' +import os, sys +import logging.config +import pytest +# Add project path to python sys.path +dirname = os.path.dirname(os.path.abspath(__file__)) +proj_dir = os.path.join(dirname, '..') +sys.path.append(proj_dir) +# print(sys.path) +from etl.loader import Loader +from etl import utils, command_line_args + + +@pytest.mark.skip(reason="test not yet implemented") +def test_Loader(): + ''' + Integration Test + ''' + assert False + +@pytest.mark.skip(reason="test not yet implemented") +def test_Loader_class(): + ''' + Test Loader class instantiation + ''' + pass + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch(): + ''' + Test fetch() function in Loader class + ''' + pass + +@pytest.mark.skip(reason="test not yet implemented") +def test_fetch_all(): + ''' + Test fetch_all() function in Loader class + ''' + pass + + +def run_loader(db_yaml,transformed_dict): + ''' + Run Loader in the same way as 'app.py' + ''' + loader = Loader(db_yaml) + loader.load_all(transformed_dict) + +def create_mock_transformed_dict(filenames): + transformed_dict = dict() + for filename in filenames: + tablename = os.path.splitext(os.path.basename(filename))[0] + transformed_dict[tablename] = utils.read_csv(filename) + return transformed_dict + +def setup_logging(): + ''' + Setup logger from config.ini + ''' + # Setup logging + logging_config = os.path.join(dirname, '../data/logger/config.ini') + logging.config.fileConfig(logging_config) + logger = logging.getLogger(__name__) + return logger + +if __name__ == '__main__': + ''' + execute only if run as a script + ''' + # Set up logger + logger = setup_logging() + + # Parse command line args for filenames + commandLineArgs = command_line_args.getCommandLineArgs(db=False) + logger.info("Using local data files: {}".format( + ' '.join(map(str, commandLineArgs.local_sources)))) + filenames = commandLineArgs.local_sources + + # Create mock post-extraction dataframe dictionary + db_yaml = utils.get_yaml('data/database/config_dev.yml') + transformed_dict = create_mock_transformed_dict(filenames) + + # Run Loader + logger.info("Running Loader standalone...") + run_loader(db_yaml, transformed_dict) From 9138b3c48b4991e87938bd9891b4bb05becde1ae Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Sun, 12 Apr 2020 23:46:20 -0500 Subject: [PATCH 16/19] data/test_*.yml: added test config files to go with test scripts --- .gitignore | 4 +++ data/sources/test_sources.yml | 27 +++++++++++++++++++ data/transform_tasks/test_transform_tasks.yml | 1 + tests/test_transformer.py | 2 +- 4 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 data/sources/test_sources.yml create mode 100644 data/transform_tasks/test_transform_tasks.yml diff --git a/.gitignore b/.gitignore index 7e88748..fb1f03b 100644 --- a/.gitignore +++ b/.gitignore @@ -148,3 +148,7 @@ transform_vacant_table.csv # sqlite test db vacancy.sqlite + +# config files for unit tests +data/source/test_sources.yml +data/transform_tasks/transform_tasks.yml diff --git a/data/sources/test_sources.yml b/data/sources/test_sources.yml new file mode 100644 index 0000000..b4b1ce6 --- /dev/null +++ b/data/sources/test_sources.yml @@ -0,0 +1,27 @@ +ESRI_Parcels_Shapefile: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=84 + url: https://www.stlouis-mo.gov/data/upload/data-files/prcl_shape.zip + +Parcels_Key: + info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=83 + url: https://www.stlouis-mo.gov/data/upload/data-files/prcl.zip + +# Parcel_Data: +# info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=85 +# url: https://www.stlouis-mo.gov/data/upload/data-files/par.zip +# +# LRA_Iventory_Records: +# info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=65 +# url: https://www.stlouis-mo.gov/data/upload/data-files/lra_public.zip +# +# Building_Inspections: +# info: https://www.stlouis-mo.gov/data/datasets/dataset.cfm?id=11 +# url: https://www.stlouis-mo.gov/data/upload/data-files/bldginsp.zip +# +# Building_Permits: +# info: https://www.stlouis-mo.gov/data/datasets/distribution.cfm?id=3 +# url: https://www.stlouis-mo.gov/data/upload/data-files/prmbdo.zip +# +# Forestry_Property_Maintenance_Data: +# Info: https://www.stlouis-mo.gov/data/datasets/dataset.cfm?id=64 +# url: https://www.stlouis-mo.gov/data/upload/data-files/forestry-maintenance-properties.csv diff --git a/data/transform_tasks/test_transform_tasks.yml b/data/transform_tasks/test_transform_tasks.yml new file mode 100644 index 0000000..15a4a38 --- /dev/null +++ b/data/transform_tasks/test_transform_tasks.yml @@ -0,0 +1 @@ +- vacant_table \ No newline at end of file diff --git a/tests/test_transformer.py b/tests/test_transformer.py index 976dfb6..94bd91f 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -80,7 +80,7 @@ def setup_logging(): filenames = commandLineArgs.local_sources # Specify transform task yaml to use - transform_yaml_path = 'data/transform_tasks/transform_tasks.yml' + transform_yaml_path = 'data/transform_tasks/test_transform_tasks.yml' # Create mock post-extraction dataframe dictionary logger.info("Importing files into dataframes...") From c94330b2b5530e49cf2da8924db0749a2635c364 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Mon, 13 Apr 2020 18:21:00 -0500 Subject: [PATCH 17/19] vacant_table.py: revert change --- etl/transformer_tasks/vacant_table/vacant_table.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etl/transformer_tasks/vacant_table/vacant_table.py b/etl/transformer_tasks/vacant_table/vacant_table.py index 756fca2..8669321 100644 --- a/etl/transformer_tasks/vacant_table/vacant_table.py +++ b/etl/transformer_tasks/vacant_table/vacant_table.py @@ -33,6 +33,7 @@ def vacant_table(df): logging.debug(merged_parcel_data) - utils.to_csv(merged_parcel_data, 'transform_vacant_table.csv') + merged_parcel_data.to_csv('transform_vacant_table.csv', index=False) + # utils.to_csv(merged_parcel_data, 'transform_vacant_table.csv') return merged_parcel_data From 914fdc8fff2c1f653c95d4ae2d0a56ad7721c394 Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Mon, 13 Apr 2020 18:27:53 -0500 Subject: [PATCH 18/19] .gitignore: ignore test_transfom_tasks.yaml --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index fb1f03b..254e8a6 100644 --- a/.gitignore +++ b/.gitignore @@ -151,4 +151,4 @@ vacancy.sqlite # config files for unit tests data/source/test_sources.yml -data/transform_tasks/transform_tasks.yml +data/transform_tasks/test_transform_tasks.yml From a000cece2bd5237be4861e7fe5c0115dc4fdf9cd Mon Sep 17 00:00:00 2001 From: Karen Wang Date: Mon, 13 Apr 2020 20:10:34 -0500 Subject: [PATCH 19/19] vacant_table.py: revert more changes --- etl/transformer_tasks/vacant_table/vacant_table.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/etl/transformer_tasks/vacant_table/vacant_table.py b/etl/transformer_tasks/vacant_table/vacant_table.py index 8669321..c3ee800 100644 --- a/etl/transformer_tasks/vacant_table/vacant_table.py +++ b/etl/transformer_tasks/vacant_table/vacant_table.py @@ -1,7 +1,6 @@ from .map_parcel_data_to_vacant_table import map_parcel_data_to_vacant_table, vacant_table_fields from .merge_parcel_data import merge_parcel_data import logging -from etl import utils def keep_only_select_columns_in_df(df, columnsToKeep): df.drop(df.columns.difference(columnsToKeep), axis=1, inplace=True) @@ -34,6 +33,5 @@ def vacant_table(df): logging.debug(merged_parcel_data) merged_parcel_data.to_csv('transform_vacant_table.csv', index=False) - # utils.to_csv(merged_parcel_data, 'transform_vacant_table.csv') return merged_parcel_data