Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 112 additions & 16 deletions jibaro/datalake/avro_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,41 @@
__all__ = ["avro_handler"]


def extract_special_types(input_dict, target_type):
saved_records = {}

def traverse_dict(d, path=""):
if isinstance(d, list):
results = []
for item in d:
r = traverse_dict(item, path)
if r is not None:
if isinstance(r, list):
results.extend(r)
else:
results.append(r)
return results if len(results) > 0 else None
elif isinstance(d, dict):
connector_name = d.get("connect.name")
if connector_name is not None and connector_name == target_type:
return path
name = d.get("name")
type_field = d.get("type")
if type_field is not None:
if type_field == "record":
saved_records[name] = d["fields"]
return traverse_dict(d["fields"], path)
else:
if name is not None:
path = path + "." + name if path != "" else name
return traverse_dict(type_field, path)
elif isinstance(d, str):
if d in saved_records:
return traverse_dict(saved_records[d], path)

return traverse_dict(input_dict)


def process_confluent_schemaregistry(
spark: SparkSession,
schema_registry_client: SchemaRegistryClient,
Expand Down Expand Up @@ -68,7 +103,7 @@ def process_confluent_schemaregistry(
& (fn.col("valueSchemaId") == currentValueSchemaId.value)
)

(
temp = (
filterDF.select(
from_avro("key", currentKeySchema.value, fromAvroOptions).alias("key"),
from_avro("value", currentValueSchema.value, fromAvroOptions).alias(
Expand All @@ -82,19 +117,80 @@ def process_confluent_schemaregistry(
fn.col("keySchemaId").cast("integer"),
fn.col("valueSchemaId").cast("integer"),
)
.write.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(
mount_path(
layer=target_layer,
project_name=project_name,
database=database,
table_name=table_name,
)
)
# .write.format("delta")
# .mode("append")
# .option("mergeSchema", "true")
# .save(
# mount_path(
# layer=target_layer,
# project_name=project_name,
# database=database,
# table_name=table_name,
# )
# )
)

import json

date_fields = extract_special_types(
json.loads(currentValueSchema.value),
"io.debezium.time.Date",
)
microtimestamp_fields = extract_special_types(
json.loads(currentValueSchema.value),
"io.debezium.time.MicroTimestamp",
)

microtimestamp_tz_fields = extract_special_types(
json.loads(currentValueSchema.value),
"io.debezium.time.ZonedTimestamp",
)

print(">>>", date_fields, microtimestamp_fields, microtimestamp_tz_fields)

if date_fields is not None:
for column in date_fields:
temp = temp.withColumn(
"value",
fn.col("value").withField(
column,
fn.date_add(fn.lit("1970-01-01"), fn.col(f"value.{column}")),
),
)
if microtimestamp_fields is not None:
timezone_local = spark.conf.get("spark.sql.session.timeZone")
for column in microtimestamp_fields:
temp = temp.withColumn(
"value",
fn.col("value").withField(
column,
fn.convert_timezone(
fn.lit(timezone_local),
fn.lit("UTC"),
fn.to_timestamp(fn.col(f"value.{column}") / 1000 / 1000),
),
),
)

if microtimestamp_tz_fields is not None:
for column in microtimestamp_tz_fields:
temp = temp.withColumn(
"value",
fn.col("value").withField(
column,
fn.to_timestamp(fn.col(f"value.{column}")),
),
)

temp.select("value.before.*").show()
temp.select("value.after.*").show(truncate=False)
temp.select("value.after.*").printSchema()
# timezone_default = spark.conf.get("spark.sql.session.timeZone")
# print(">>>> timezone_default", timezone_default)
# spark.conf.set("spark.sql.session.timeZone", "UTC")
# temp.select("value.after.*").show(truncate=False)
# spark.conf.set("spark.sql.session.timeZone", timezone_default)


def avro_handler(
spark,
Expand All @@ -116,10 +212,10 @@ def avro_handler(

(
df.writeStream.trigger(once=True)
.option(
"checkpointLocation",
mount_checkpoint_path(target_layer, project_name, database, table_name),
)
# .option(
# "checkpointLocation",
# mount_checkpoint_path(target_layer, project_name, database, table_name),
# )
.foreachBatch(
lambda df_batch, batch_id: process_confluent_schemaregistry(
spark=spark,
Expand Down
23 changes: 12 additions & 11 deletions lake_lab/docker-compose.lake.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
version: '2'
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
Expand All @@ -24,7 +24,7 @@ services:
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Expand All @@ -48,7 +48,7 @@ services:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
Expand All @@ -62,7 +62,7 @@ services:
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
Expand All @@ -89,7 +89,7 @@ services:

# To monitor Kafka
kowl:
image: quay.io/cloudhut/kowl:master
image: ignitz/kowl:latest
hostname: kowl
container_name: kowl
depends_on:
Expand All @@ -106,7 +106,7 @@ services:

# To use as backend of Airflow and datasource to Kafka-Connect
postgres:
image: quay.io/debezium/example-postgres:1.9
image: quay.io/debezium/example-postgres:2.6
hostname: postgres
container_name: postgres
restart: unless-stopped
Expand All @@ -116,11 +116,11 @@ services:
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- 5432:5432
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

minio:
image: quay.io/minio/minio:latest
hostname: minio
Expand All @@ -132,7 +132,7 @@ services:
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: miniominio
## uncomment to persist data in minio
## uncomment to persist data in minio
# volumes:
# - ./data/minio:/data
volumes:
Expand All @@ -141,6 +141,7 @@ services:
networks:
default:
name: kind
external: true

volumes:
minio:
Expand Down