diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..70e785b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.9 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY /data_ingestion . +COPY /data . + +CMD ["python", "__init__.py"] diff --git a/README.md b/README.md index ff66979..8ea498e 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,30 @@ -# Data Engineer - Technical Assesment -Our tech teams are curious, driven, intelligent, pragmatic, collaborative and open-minded and you should be too. +Run PostgreSQL in a Docker container: -## Testing Goals -We are testing your ability to design and prototype a scalable data-pipeline (with code) underpinned with good data/software engineering principles from a blank canvas. You will need to use your intellect, creativity, judgement and be comfortable making decisions to produce a solution. + docker run -d \ + --name postgres_container_emis \ + -e POSTGRES_USER=admin1 \ + -e POSTGRES_PASSWORD=password \ + -e POSTGRES_DB=fhir \ + -p 5432:5432 \ + -v pgdata:/var/lib/postgresql1/data \ + postgres:latest -You will have approximately 1 week to complete this task but can as much or as little time as you deem necessary to **demonstrate your understanding of the problem, your range of skills and approach to problem solving**. +Start the PostgreSQL container : -Some successful candidates have spent as little as 3 hours whilst others have used the full week because they've enjoyed exploring different ideas, technologies and approaches. + docker start postgres_container_emis -## The Task -An external system / supplier is sending patient data to our platform using the FHIR standard. Our analytics teams find this format difficult to work with when creating dashboards and visualizations. You are required to tranform these FHIR messages into a more workable format preferably in a tabular format. Include any documentation / commentary you deem necessary. +Build the Docker image for FHIR ingestion: + docker build -t fhir_ingestion . -## The Solution -If you are applying for a position that uses one specific programming language, please write your solution in that language, otherwise your solution can use any of the following technologies along with **any frameworks, libraries you feel appropriate**: +Run the FHIR ingestion container: -- **Programming Languages** - Java / Python / Scala / Go / C# -- **Data Storage Layer** - MongoDB / MySql / Postgres / SQLServer Express / Filesystem (CSV/Parquet/Orc) + docker run -it fhir_ingestion -Containerising your pipeline using docker / docker-compose is strongly encouraged, but not required. +Provide the PostgreSQL container's IP address when prompted: -## Evaluation -We take into account 5 areas when evaluating a solution. Each criteria is evaluated from 0 (non-existent) to 5 (excellent) and your final score would be a simple average across all 5 areas. These are: + -- **Functionality**: Is the solution correct? Does it run in a decent amount of time? How well thought and architected is the solution? -- **Good Practices**: Does the code follow standard practices for the language and framework used? Take into account reusability, names, function length, structure, how crendentials are handled, etc. -- **Testing**: Unit and integration tests. -- **Execution environment**: Container, Virtual Environment, Dependency Management, Isolation, Ease of transition into a production environment etc. -- **Documentation**: How to install and run the solution? How to see and use the results? What is the architecture? Any next steps? +Docker image is publicly available -## Context -[FHIR](/https://www.hl7.org/fhir/overview.html) is a popular standard within healthcare used by healthcare systems to exchange data and represent details of paitents in a standardised way. Some sample FHIR data has been generated in the data directory using a tool called [synthea](https://www.hl7.org/fhir/overview.html). - -## Submit your solution -Create a public Github repository and push your solution including any documentation you feel necessary. Commit often - we would rather see a history of trial and error than a single monolithic push. When you're finished, please send us the URL to the repository. +sellapandi/applications:latest diff --git a/data_ingestion/__init__.py b/data_ingestion/__init__.py new file mode 100644 index 0000000..0104905 --- /dev/null +++ b/data_ingestion/__init__.py @@ -0,0 +1,51 @@ +import pandas as pd +import json, glob +from extraction.extraction import * +from db_connection import db_conn + +metadata = { + 'Patient': {"data": [], "columns": ['id', 'passport_no', 'driving_license', 'social_security_no', 'medical_record_no', 'name', 'telecom', 'gender', 'birth_date', 'deceased_date_time', 'address', 'marital_status', 'communication']}, + 'Encounter': {"data": [], "columns": ['id', 'status', 'class_code', 'type_code', 'type_text', 'subject', 'start_date', 'end_date', 'location', 'service_provider']}, + 'Condition': {"data": [], "columns": ['id', 'clinical_status', 'verification_status', 'category_code', 'category', 'subject', 'encounter', 'onset_recorded', 'recorded_date', 'code', 'text']}, + 'DiagnosticReport': {"data": [], "columns": ['id', 'status', 'category', 'subject', 'encounter', 'effective_date', 'issued_date', 'performer']}, + 'DocumentReference': {"data": [], "columns": ['id', 'status', 'type', 'category', 'subject', 'date', 'author', 'custodian', 'encounter', 'period_start', 'period_end']}, + 'Claim': {"data": [], "columns": ['id', 'status', 'type_code', 'patient', 'billable_start_date', 'billable_end_date', 'created_date', 'provider', 'priority', 'insurence', 'service_codes', 'total_amount']}, + 'Procedure': {"data": [], "columns": ['id', 'status', 'code', 'text', 'subject', 'encounter', 'performed_start_date', 'performed_end_date', 'location']}, + 'Immunization': {"data": [], "columns": ['id', 'status', 'vaccine_code', 'vaccine_name', 'patient', 'encounter', 'occurrence_date_time', 'primary_source', 'location']}, + 'ExplanationOfBenefit': {"data": [], "columns": ['id', 'status', 'patient', 'insurer', 'claim_id', 'total_amount', 'payment_amount']} +} + +json_files = glob.glob("*.json") +print("Input files : ", len(json_files)) +for file in json_files: + with open(file, 'r') as f: + raw_data = json.load(f) + for each_entry in raw_data['entry']: + if each_entry['resource']['resourceType'] == 'Patient': + metadata['Patient']['data'].append(ingest_patient_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'Encounter': + metadata['Encounter']['data'].append(ingest_encounter_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'Condition': + metadata['Condition']['data'].append(ingest_condition_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'DiagnosticReport': + metadata['DiagnosticReport']['data'].append(ingest_diagnostic_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'DocumentReference': + metadata['DocumentReference']['data'].append(ingest_document_reference(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'Claim': + metadata['Claim']['data'].append(ingest_claim_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'Procedure': + metadata['Procedure']['data'].append(ingest_procedure_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'Immunization': + metadata['Immunization']['data'].append(ingest_immunization_data(each_entry['resource'])) + if each_entry['resource']['resourceType'] == 'ExplanationOfBenefit': + metadata['ExplanationOfBenefit']['data'].append(ingest_eob_data(each_entry['resource'])) + +for table, data_colums in metadata.items(): + + df = pd.DataFrame(data_colums['data'], columns=data_colums['columns']) + + for col in list(df.columns): + if isinstance(df[col][0], list): + df[col] = df[col].apply(json.dumps) + df.to_sql(name=table, con=db_conn, if_exists="replace", index=False) +print('Successfully Ingested!') \ No newline at end of file diff --git a/data_ingestion/db_connection.py b/data_ingestion/db_connection.py new file mode 100644 index 0000000..a78f8b0 --- /dev/null +++ b/data_ingestion/db_connection.py @@ -0,0 +1,8 @@ +from sqlalchemy import create_engine + +print('run this command to get the postgres IP `docker inspect postgres_container_emis | grep "IPAddress"`') +ip = input() +if ip: + db_conn = create_engine(f"postgresql+psycopg2://admin1:password@{ip}:5432/fhir") +else: + exit() diff --git a/data_ingestion/extraction/extraction.py b/data_ingestion/extraction/extraction.py new file mode 100644 index 0000000..b6e12bb --- /dev/null +++ b/data_ingestion/extraction/extraction.py @@ -0,0 +1,139 @@ +def ingest_claim_data(data): + + id = data.get('id') + status = data.get('status') + type_code = data.get('type')['coding'][0]['code'] + patient = data['patient']['reference'].split(':')[-1] + billable_start_date = data.get('billablePeriod')['start'] + billable_end_date = data.get('billablePeriod')['end'] + created_date = data.get('created') + provider = data['provider']['display'] + priority = data.get('priority')['coding'][0]['code'] + # diagnosis_reference = [ each.get('diagnosisReference').get('reference') for each in data.get('diagnosis', []) if each != None] + insurence = [each['coverage']['display'] for each in data.get('insurance')] + service_codes = [each['productOrService']['coding'][0]['code'] for each in data['item']] + total_amnt = float(data.get('total')['value']) + return (id, status, type_code, patient, billable_start_date, billable_end_date, created_date, provider, priority, insurence, service_codes, total_amnt) + +def ingest_condition_data(data): + + id = data.get('id') + clinical_status = data.get('clinicalStatus')['coding'][0]['code'] + verification_status = data.get('verificationStatus')['coding'][0]['code'] + category_code = data.get('category')[0]['coding'][0]['code'] + category = data.get('category')[0]['coding'][0]['display'] + subject = data['subject']['reference'].split(':')[-1] + encounter = data['encounter']['reference'].split(':')[-1] + onsetrecorded = data.get('onsetDateTime') + recorded_date = data.get('recordedDate') + code = data.get('code')['coding'][0]['code'] + text = data.get('code')['coding'][0]['display'] + return (id, clinical_status, verification_status, category_code, category, subject, encounter, onsetrecorded, recorded_date, code, text) + + +def ingest_diagnostic_data(data): + + id = data.get('id') + status = data.get('status') + category = [{'code': each['code'], 'display': each['display']} for each in data.get('category')[0]['coding']] + subject = data['subject']['reference'].split(':')[-1] + encounter = data['encounter']['reference'].split(':')[-1] + effective_date = data.get('effectiveDateTime') + issued_date = data.get('issued') + performer = data.get('performer')[0]['display'] + return (id, status, category, subject, encounter, effective_date, issued_date, performer) + +def ingest_document_reference(data): + + id = data.get('id') + status = data.get('status') + type = [{'code': each['code'], 'display': each['display']} for each in data.get('type')['coding']] + category = [{'code': each['code'], 'display': each['display']} for each in data.get('category')[0]['coding']] + subject = data['subject']['reference'].split(':')[-1] + date = data.get('date') + author = data.get('author')[0]['display'].split(':')[-1] + custodian = data.get('custodian')['display'].split(':')[-1] + encounter = data['context']['encounter'][0]['reference'].split(':')[-1] + period_start = data['context']['period']['start'] + period_end = data['context']['period']['end'] + return (id, status, type, category, subject, date, author, custodian, encounter, period_start, period_end) + + +def ingest_encounter_data(data): + + id = data.get('id') + status = data.get('status') + class_code = data.get('class')['code'] + type_code = data.get('type')[0]['coding'][0]['code'] + type_text = data.get('type')[0]['coding'][0]['display'] + subject = data.get('subject')['reference'].split(':')[-1] + start_date = data.get('period')['start'] + end_date = data.get('period')['end'] + location = data.get('location')[0]['location']['display'] + serviceprovider = data.get('serviceProvider')['display'] + return (id, status, class_code, type_code, type_text, subject, start_date, end_date, location, serviceprovider) + +def ingest_eob_data(data): + + id = data.get('id') + status = data.get('status') + patient = data['patient']['reference'].split(':')[-1] + insurer = data['insurer']['display'] + claim_id = data['claim']['reference'].split(':')[-1] + total_amount = float(data['total'][0]['amount']['value']) + payment_amount = float(data['payment']['amount']['value']) + return (id, status, patient, insurer, claim_id, total_amount, payment_amount) + +def ingest_immunization_data(data): + + id = data.get('id') + status = data.get('status') + vaccine_code = [each['code'] for each in data.get('vaccineCode')['coding']] + vaccine_name = [each['display'] for each in data.get('vaccineCode')['coding']] + patient = data['patient']['reference'].split(':')[-1] + encounter = data['encounter']['reference'].split(':')[-1] + occurrenceDateTime = data.get('occurrenceDateTime') + primary_source = data.get('primarySource') + location = data.get('location')['display'] + return (id, status, vaccine_code, vaccine_name, patient, encounter, occurrenceDateTime, primary_source, location) + +def ingest_patient_data(data): + + id = data.get('id') + identifier = data.get('identifier') + ppn, dl, ss, mr = None, None, None, None + for each in identifier: + if 'type' in each: + if 'coding' in each['type']: + for each_coding in each['type']['coding']: + if each_coding['code'] == 'PPN': + ppn = each['value'] + if each_coding['code'] == 'DL': + dl = each['value'] + if each_coding['code'] == 'SS': + ss = each['value'] + if each_coding['code'] == 'MR': + mr = each['value'] + + name = data.get('name')[0]['given'][0] + ' ' + data.get('name')[0]['family'] + telecom = data.get('telecom')[0]['value'] + gender = data.get('gender') + birthDate = data.get('birthDate') + deceasedDateTime = data.get('deceasedDateTime', '') + address = data.get('address')[0]['line'][0]+ ', ' + data.get('address')[0]['city'] + ', '+ data.get('address')[0]['state'] +', '+ data.get('address')[0]['country'] + maritalStatus = data.get('maritalStatus')['coding'][0]['code'] + communication = data.get('communication')[0]['language']['text'] + return (id, ppn, dl, ss, mr, name, telecom, gender, birthDate, deceasedDateTime, address, maritalStatus, communication) + +def ingest_procedure_data(data): + + id = data.get('id', '') + status = data.get('status', '') + code = [each['code'] for each in data.get('code', {}).get('coding', [])] + text = [each['display'] for each in data.get('code', {}).get('coding', [])] + subject = data.get('subject', {}).get('reference', '').split(':')[-1] + encounter = data.get('encounter', {}).get('reference', '').split(':')[-1] + performed_start_date = data.get('performedDateTime', {}).get('start', '') + performed_end_date = data.get('performedPeriod', {}).get('end', '') + location = data.get('location', {}).get('display', '') + return (id, status, code, text, subject, encounter, performed_start_date, performed_end_date, location) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8c3d91e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pandas +psycopg2-binary +sqlalchemy \ No newline at end of file