diff --git a/jibaro/datalake/avro_handler.py b/jibaro/datalake/avro_handler.py index f6b27ae..4c6d712 100644 --- a/jibaro/datalake/avro_handler.py +++ b/jibaro/datalake/avro_handler.py @@ -9,6 +9,41 @@ __all__ = ["avro_handler"] +def extract_special_types(input_dict, target_type): + saved_records = {} + + def traverse_dict(d, path=""): + if isinstance(d, list): + results = [] + for item in d: + r = traverse_dict(item, path) + if r is not None: + if isinstance(r, list): + results.extend(r) + else: + results.append(r) + return results if len(results) > 0 else None + elif isinstance(d, dict): + connector_name = d.get("connect.name") + if connector_name is not None and connector_name == target_type: + return path + name = d.get("name") + type_field = d.get("type") + if type_field is not None: + if type_field == "record": + saved_records[name] = d["fields"] + return traverse_dict(d["fields"], path) + else: + if name is not None: + path = path + "." + name if path != "" else name + return traverse_dict(type_field, path) + elif isinstance(d, str): + if d in saved_records: + return traverse_dict(saved_records[d], path) + + return traverse_dict(input_dict) + + def process_confluent_schemaregistry( spark: SparkSession, schema_registry_client: SchemaRegistryClient, @@ -68,7 +103,7 @@ def process_confluent_schemaregistry( & (fn.col("valueSchemaId") == currentValueSchemaId.value) ) - ( + temp = ( filterDF.select( from_avro("key", currentKeySchema.value, fromAvroOptions).alias("key"), from_avro("value", currentValueSchema.value, fromAvroOptions).alias( @@ -82,19 +117,80 @@ def process_confluent_schemaregistry( fn.col("keySchemaId").cast("integer"), fn.col("valueSchemaId").cast("integer"), ) - .write.format("delta") - .mode("append") - .option("mergeSchema", "true") - .save( - mount_path( - layer=target_layer, - project_name=project_name, - database=database, - table_name=table_name, - ) - ) + # .write.format("delta") + # .mode("append") + # .option("mergeSchema", "true") + # .save( + # mount_path( + # layer=target_layer, + # project_name=project_name, + # database=database, + # table_name=table_name, + # ) + # ) + ) + + import json + + date_fields = extract_special_types( + json.loads(currentValueSchema.value), + "io.debezium.time.Date", + ) + microtimestamp_fields = extract_special_types( + json.loads(currentValueSchema.value), + "io.debezium.time.MicroTimestamp", + ) + + microtimestamp_tz_fields = extract_special_types( + json.loads(currentValueSchema.value), + "io.debezium.time.ZonedTimestamp", ) + print(">>>", date_fields, microtimestamp_fields, microtimestamp_tz_fields) + + if date_fields is not None: + for column in date_fields: + temp = temp.withColumn( + "value", + fn.col("value").withField( + column, + fn.date_add(fn.lit("1970-01-01"), fn.col(f"value.{column}")), + ), + ) + if microtimestamp_fields is not None: + timezone_local = spark.conf.get("spark.sql.session.timeZone") + for column in microtimestamp_fields: + temp = temp.withColumn( + "value", + fn.col("value").withField( + column, + fn.convert_timezone( + fn.lit(timezone_local), + fn.lit("UTC"), + fn.to_timestamp(fn.col(f"value.{column}") / 1000 / 1000), + ), + ), + ) + + if microtimestamp_tz_fields is not None: + for column in microtimestamp_tz_fields: + temp = temp.withColumn( + "value", + fn.col("value").withField( + column, + fn.to_timestamp(fn.col(f"value.{column}")), + ), + ) + + temp.select("value.before.*").show() + temp.select("value.after.*").show(truncate=False) + temp.select("value.after.*").printSchema() + # timezone_default = spark.conf.get("spark.sql.session.timeZone") + # print(">>>> timezone_default", timezone_default) + # spark.conf.set("spark.sql.session.timeZone", "UTC") + # temp.select("value.after.*").show(truncate=False) + # spark.conf.set("spark.sql.session.timeZone", timezone_default) + def avro_handler( spark, @@ -116,10 +212,10 @@ def avro_handler( ( df.writeStream.trigger(once=True) - .option( - "checkpointLocation", - mount_checkpoint_path(target_layer, project_name, database, table_name), - ) + # .option( + # "checkpointLocation", + # mount_checkpoint_path(target_layer, project_name, database, table_name), + # ) .foreachBatch( lambda df_batch, batch_id: process_confluent_schemaregistry( spark=spark, diff --git a/lake_lab/docker-compose.lake.yaml b/lake_lab/docker-compose.lake.yaml index 9ac58dd..db318cc 100644 --- a/lake_lab/docker-compose.lake.yaml +++ b/lake_lab/docker-compose.lake.yaml @@ -1,5 +1,5 @@ --- -version: '2' +version: "2" services: zookeeper: image: confluentinc/cp-zookeeper:7.2.0 @@ -24,7 +24,7 @@ services: - "9101:9101" environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 @@ -48,7 +48,7 @@ services: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092" SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: @@ -62,7 +62,7 @@ services: ports: - "8083:8083" environment: - CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' + CONNECT_BOOTSTRAP_SERVERS: "broker:29092" CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs @@ -89,7 +89,7 @@ services: # To monitor Kafka kowl: - image: quay.io/cloudhut/kowl:master + image: ignitz/kowl:latest hostname: kowl container_name: kowl depends_on: @@ -106,7 +106,7 @@ services: # To use as backend of Airflow and datasource to Kafka-Connect postgres: - image: quay.io/debezium/example-postgres:1.9 + image: quay.io/debezium/example-postgres:2.6 hostname: postgres container_name: postgres restart: unless-stopped @@ -116,11 +116,11 @@ services: volumes: - postgres-data:/var/lib/postgresql/data ports: - - 5432:5432 + - 5432:5432 environment: - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgres - + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + minio: image: quay.io/minio/minio:latest hostname: minio @@ -132,7 +132,7 @@ services: environment: MINIO_ROOT_USER: minio MINIO_ROOT_PASSWORD: miniominio - ## uncomment to persist data in minio + ## uncomment to persist data in minio # volumes: # - ./data/minio:/data volumes: @@ -141,6 +141,7 @@ services: networks: default: name: kind + external: true volumes: minio: