Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.9

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY /data_ingestion .
COPY /data .

CMD ["python", "__init__.py"]
44 changes: 19 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
# Data Engineer - Technical Assesment
Our tech teams are curious, driven, intelligent, pragmatic, collaborative and open-minded and you should be too.
Run PostgreSQL in a Docker container:

## Testing Goals
We are testing your ability to design and prototype a scalable data-pipeline (with code) underpinned with good data/software engineering principles from a blank canvas. You will need to use your intellect, creativity, judgement and be comfortable making decisions to produce a solution.
docker run -d \
--name postgres_container_emis \
-e POSTGRES_USER=admin1 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=fhir \
-p 5432:5432 \
-v pgdata:/var/lib/postgresql1/data \
postgres:latest

You will have approximately 1 week to complete this task but can as much or as little time as you deem necessary to **demonstrate your understanding of the problem, your range of skills and approach to problem solving**.
Start the PostgreSQL container :

Some successful candidates have spent as little as 3 hours whilst others have used the full week because they've enjoyed exploring different ideas, technologies and approaches.
docker start postgres_container_emis

## The Task
An external system / supplier is sending patient data to our platform using the FHIR standard. Our analytics teams find this format difficult to work with when creating dashboards and visualizations. You are required to tranform these FHIR messages into a more workable format preferably in a tabular format. Include any documentation / commentary you deem necessary.
Build the Docker image for FHIR ingestion:

docker build -t fhir_ingestion .

## The Solution
If you are applying for a position that uses one specific programming language, please write your solution in that language, otherwise your solution can use any of the following technologies along with **any frameworks, libraries you feel appropriate**:
Run the FHIR ingestion container:

- **Programming Languages** - Java / Python / Scala / Go / C#
- **Data Storage Layer** - MongoDB / MySql / Postgres / SQLServer Express / Filesystem (CSV/Parquet/Orc)
docker run -it fhir_ingestion

Containerising your pipeline using docker / docker-compose is strongly encouraged, but not required.
Provide the PostgreSQL container's IP address when prompted:

## Evaluation
We take into account 5 areas when evaluating a solution. Each criteria is evaluated from 0 (non-existent) to 5 (excellent) and your final score would be a simple average across all 5 areas. These are:
<input_ip_address_of_postgres_container_emis>

- **Functionality**: Is the solution correct? Does it run in a decent amount of time? How well thought and architected is the solution?
- **Good Practices**: Does the code follow standard practices for the language and framework used? Take into account reusability, names, function length, structure, how crendentials are handled, etc.
- **Testing**: Unit and integration tests.
- **Execution environment**: Container, Virtual Environment, Dependency Management, Isolation, Ease of transition into a production environment etc.
- **Documentation**: How to install and run the solution? How to see and use the results? What is the architecture? Any next steps?
Docker image is publicly available

## Context
[FHIR](/https://www.hl7.org/fhir/overview.html) is a popular standard within healthcare used by healthcare systems to exchange data and represent details of paitents in a standardised way. Some sample FHIR data has been generated in the data directory using a tool called [synthea](https://www.hl7.org/fhir/overview.html).

## Submit your solution
Create a public Github repository and push your solution including any documentation you feel necessary. Commit often - we would rather see a history of trial and error than a single monolithic push. When you're finished, please send us the URL to the repository.
sellapandi/applications:latest
51 changes: 51 additions & 0 deletions data_ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pandas as pd
import json, glob
from extraction.extraction import *
from db_connection import db_conn

metadata = {
'Patient': {"data": [], "columns": ['id', 'passport_no', 'driving_license', 'social_security_no', 'medical_record_no', 'name', 'telecom', 'gender', 'birth_date', 'deceased_date_time', 'address', 'marital_status', 'communication']},
'Encounter': {"data": [], "columns": ['id', 'status', 'class_code', 'type_code', 'type_text', 'subject', 'start_date', 'end_date', 'location', 'service_provider']},
'Condition': {"data": [], "columns": ['id', 'clinical_status', 'verification_status', 'category_code', 'category', 'subject', 'encounter', 'onset_recorded', 'recorded_date', 'code', 'text']},
'DiagnosticReport': {"data": [], "columns": ['id', 'status', 'category', 'subject', 'encounter', 'effective_date', 'issued_date', 'performer']},
'DocumentReference': {"data": [], "columns": ['id', 'status', 'type', 'category', 'subject', 'date', 'author', 'custodian', 'encounter', 'period_start', 'period_end']},
'Claim': {"data": [], "columns": ['id', 'status', 'type_code', 'patient', 'billable_start_date', 'billable_end_date', 'created_date', 'provider', 'priority', 'insurence', 'service_codes', 'total_amount']},
'Procedure': {"data": [], "columns": ['id', 'status', 'code', 'text', 'subject', 'encounter', 'performed_start_date', 'performed_end_date', 'location']},
'Immunization': {"data": [], "columns": ['id', 'status', 'vaccine_code', 'vaccine_name', 'patient', 'encounter', 'occurrence_date_time', 'primary_source', 'location']},
'ExplanationOfBenefit': {"data": [], "columns": ['id', 'status', 'patient', 'insurer', 'claim_id', 'total_amount', 'payment_amount']}
}

json_files = glob.glob("*.json")
print("Input files : ", len(json_files))
for file in json_files:
with open(file, 'r') as f:
raw_data = json.load(f)
for each_entry in raw_data['entry']:
if each_entry['resource']['resourceType'] == 'Patient':
metadata['Patient']['data'].append(ingest_patient_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'Encounter':
metadata['Encounter']['data'].append(ingest_encounter_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'Condition':
metadata['Condition']['data'].append(ingest_condition_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'DiagnosticReport':
metadata['DiagnosticReport']['data'].append(ingest_diagnostic_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'DocumentReference':
metadata['DocumentReference']['data'].append(ingest_document_reference(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'Claim':
metadata['Claim']['data'].append(ingest_claim_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'Procedure':
metadata['Procedure']['data'].append(ingest_procedure_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'Immunization':
metadata['Immunization']['data'].append(ingest_immunization_data(each_entry['resource']))
if each_entry['resource']['resourceType'] == 'ExplanationOfBenefit':
metadata['ExplanationOfBenefit']['data'].append(ingest_eob_data(each_entry['resource']))

for table, data_colums in metadata.items():

df = pd.DataFrame(data_colums['data'], columns=data_colums['columns'])

for col in list(df.columns):
if isinstance(df[col][0], list):
df[col] = df[col].apply(json.dumps)
df.to_sql(name=table, con=db_conn, if_exists="replace", index=False)
print('Successfully Ingested!')
8 changes: 8 additions & 0 deletions data_ingestion/db_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from sqlalchemy import create_engine

print('run this command to get the postgres IP `docker inspect postgres_container_emis | grep "IPAddress"`')
ip = input()
if ip:
db_conn = create_engine(f"postgresql+psycopg2://admin1:password@{ip}:5432/fhir")
else:
exit()
139 changes: 139 additions & 0 deletions data_ingestion/extraction/extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
def ingest_claim_data(data):

id = data.get('id')
status = data.get('status')
type_code = data.get('type')['coding'][0]['code']
patient = data['patient']['reference'].split(':')[-1]
billable_start_date = data.get('billablePeriod')['start']
billable_end_date = data.get('billablePeriod')['end']
created_date = data.get('created')
provider = data['provider']['display']
priority = data.get('priority')['coding'][0]['code']
# diagnosis_reference = [ each.get('diagnosisReference').get('reference') for each in data.get('diagnosis', []) if each != None]
insurence = [each['coverage']['display'] for each in data.get('insurance')]
service_codes = [each['productOrService']['coding'][0]['code'] for each in data['item']]
total_amnt = float(data.get('total')['value'])
return (id, status, type_code, patient, billable_start_date, billable_end_date, created_date, provider, priority, insurence, service_codes, total_amnt)

def ingest_condition_data(data):

id = data.get('id')
clinical_status = data.get('clinicalStatus')['coding'][0]['code']
verification_status = data.get('verificationStatus')['coding'][0]['code']
category_code = data.get('category')[0]['coding'][0]['code']
category = data.get('category')[0]['coding'][0]['display']
subject = data['subject']['reference'].split(':')[-1]
encounter = data['encounter']['reference'].split(':')[-1]
onsetrecorded = data.get('onsetDateTime')
recorded_date = data.get('recordedDate')
code = data.get('code')['coding'][0]['code']
text = data.get('code')['coding'][0]['display']
return (id, clinical_status, verification_status, category_code, category, subject, encounter, onsetrecorded, recorded_date, code, text)


def ingest_diagnostic_data(data):

id = data.get('id')
status = data.get('status')
category = [{'code': each['code'], 'display': each['display']} for each in data.get('category')[0]['coding']]
subject = data['subject']['reference'].split(':')[-1]
encounter = data['encounter']['reference'].split(':')[-1]
effective_date = data.get('effectiveDateTime')
issued_date = data.get('issued')
performer = data.get('performer')[0]['display']
return (id, status, category, subject, encounter, effective_date, issued_date, performer)

def ingest_document_reference(data):

id = data.get('id')
status = data.get('status')
type = [{'code': each['code'], 'display': each['display']} for each in data.get('type')['coding']]
category = [{'code': each['code'], 'display': each['display']} for each in data.get('category')[0]['coding']]
subject = data['subject']['reference'].split(':')[-1]
date = data.get('date')
author = data.get('author')[0]['display'].split(':')[-1]
custodian = data.get('custodian')['display'].split(':')[-1]
encounter = data['context']['encounter'][0]['reference'].split(':')[-1]
period_start = data['context']['period']['start']
period_end = data['context']['period']['end']
return (id, status, type, category, subject, date, author, custodian, encounter, period_start, period_end)


def ingest_encounter_data(data):

id = data.get('id')
status = data.get('status')
class_code = data.get('class')['code']
type_code = data.get('type')[0]['coding'][0]['code']
type_text = data.get('type')[0]['coding'][0]['display']
subject = data.get('subject')['reference'].split(':')[-1]
start_date = data.get('period')['start']
end_date = data.get('period')['end']
location = data.get('location')[0]['location']['display']
serviceprovider = data.get('serviceProvider')['display']
return (id, status, class_code, type_code, type_text, subject, start_date, end_date, location, serviceprovider)

def ingest_eob_data(data):

id = data.get('id')
status = data.get('status')
patient = data['patient']['reference'].split(':')[-1]
insurer = data['insurer']['display']
claim_id = data['claim']['reference'].split(':')[-1]
total_amount = float(data['total'][0]['amount']['value'])
payment_amount = float(data['payment']['amount']['value'])
return (id, status, patient, insurer, claim_id, total_amount, payment_amount)

def ingest_immunization_data(data):

id = data.get('id')
status = data.get('status')
vaccine_code = [each['code'] for each in data.get('vaccineCode')['coding']]
vaccine_name = [each['display'] for each in data.get('vaccineCode')['coding']]
patient = data['patient']['reference'].split(':')[-1]
encounter = data['encounter']['reference'].split(':')[-1]
occurrenceDateTime = data.get('occurrenceDateTime')
primary_source = data.get('primarySource')
location = data.get('location')['display']
return (id, status, vaccine_code, vaccine_name, patient, encounter, occurrenceDateTime, primary_source, location)

def ingest_patient_data(data):

id = data.get('id')
identifier = data.get('identifier')
ppn, dl, ss, mr = None, None, None, None
for each in identifier:
if 'type' in each:
if 'coding' in each['type']:
for each_coding in each['type']['coding']:
if each_coding['code'] == 'PPN':
ppn = each['value']
if each_coding['code'] == 'DL':
dl = each['value']
if each_coding['code'] == 'SS':
ss = each['value']
if each_coding['code'] == 'MR':
mr = each['value']

name = data.get('name')[0]['given'][0] + ' ' + data.get('name')[0]['family']
telecom = data.get('telecom')[0]['value']
gender = data.get('gender')
birthDate = data.get('birthDate')
deceasedDateTime = data.get('deceasedDateTime', '')
address = data.get('address')[0]['line'][0]+ ', ' + data.get('address')[0]['city'] + ', '+ data.get('address')[0]['state'] +', '+ data.get('address')[0]['country']
maritalStatus = data.get('maritalStatus')['coding'][0]['code']
communication = data.get('communication')[0]['language']['text']
return (id, ppn, dl, ss, mr, name, telecom, gender, birthDate, deceasedDateTime, address, maritalStatus, communication)

def ingest_procedure_data(data):

id = data.get('id', '')
status = data.get('status', '')
code = [each['code'] for each in data.get('code', {}).get('coding', [])]
text = [each['display'] for each in data.get('code', {}).get('coding', [])]
subject = data.get('subject', {}).get('reference', '').split(':')[-1]
encounter = data.get('encounter', {}).get('reference', '').split(':')[-1]
performed_start_date = data.get('performedDateTime', {}).get('start', '')
performed_end_date = data.get('performedPeriod', {}).get('end', '')
location = data.get('location', {}).get('display', '')
return (id, status, code, text, subject, encounter, performed_start_date, performed_end_date, location)
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pandas
psycopg2-binary
sqlalchemy