1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Unable to get the postgres data in the right format via Kafka, JDBC source connector...

Discussão em 'Python' iniciado por Stack, Outubro 25, 2024 às 11:52.

  1. Stack

    Stack Membro Participativo

    I have created a table in Postgres:

    CREATE TABLE IF NOT EXISTS public.sample_a
    (
    id text COLLATE pg_catalog."default" NOT NULL,
    is_active boolean NOT NULL,
    is_deleted boolean NOT NULL,
    created_by integer NOT NULL,
    created_at timestamp with time zone NOT NULL,
    created_ip character varying(30) COLLATE pg_catalog."default" NOT NULL,
    created_dept_id integer NOT NULL,
    updated_by integer,
    updated_at timestamp with time zone,
    updated_ip character varying(30) COLLATE pg_catalog."default",
    updated_dept_id integer,
    deleted_by integer,
    deleted_at timestamp with time zone,
    deleted_ip character varying(30) COLLATE pg_catalog."default",
    deleted_dept_id integer,
    sql_id bigint NOT NULL,
    ipa_no character varying(30) COLLATE pg_catalog."default" NOT NULL,
    pe_id bigint NOT NULL,
    uid character varying(30) COLLATE pg_catalog."default" NOT NULL,
    mr_no character varying(15) COLLATE pg_catalog."default" NOT NULL,
    site_id integer NOT NULL,
    entered_date date NOT NULL,
    CONSTRAINT pk_patient_dilation PRIMARY KEY (id)
    );


    and I have inserted the data as below:

    INSERT INTO sample_a (id, is_active, is_deleted, created_by, created_at, created_ip, created_dept_id, updated_by, updated_at, updated_ip, updated_dept_id, deleted_by, deleted_at, deleted_ip, deleted_dept_id, sql_id, ipa_no, pe_id, uid, mr_no, site_id, entered_date)
    VALUES ('00037167-0894-4373-9a56-44c49d2285c9', TRUE, FALSE, 70516, '2024-10-05 08:12:25.069941+00','10.160.0.76', 4, 70516, '2024-10-05 09:25:55.218961+00', '10.84.0.1',4,NULL, NULL, NULL, NULL, 0,0,165587147,'22516767','P5942023',1,'10/5/24');


    Now, I have created the JDBC source connector config as below:

    {
    "name": "JdbcSourceConnectorConnector_0",
    "config": {
    "name": "JdbcSourceConnectorConnector_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "********",
    "table.whitelist": "sample_a",
    "mode": "bulk"
    }
    }


    So when the data is pushed from the DB to the Kafka Topic, I can see the data in the readable format in the Kafka Control Center tab. Since I am using bulk mode, the data is continuously being loaded.

    My Problem is when I fetch the data via Pyspark, it is not readable:

    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import col

    spark = SparkSession \
    .builder \
    .appName("Kafka_Test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

    df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sample_a") \
    .option("startingOffsets","latest") \
    .load()

    df.selectExpr("cast(value as string) as value").writeStream.format("console").start()

    spark.streams.awaitAnyTermination()


    Output:

    H00037167-0894-4373-9a56-44c49d2285c9?ڹ??d10.160.0.7??????d10.84.0.0????22516767P5942023¸


    So do I access the specific attributes? Do I need any deserializer class?

    TIA.

    As per Nimi's suggestion I was able to fetch the schema via the schema registry and tried to use the from_avro method but it gave me this error:

    jsonSchema = {"type":"record","name":"sample_a","fields":[{"name":"id","type":"string"},{"name":"is_active","type":"boolean"},{"name":"is_deleted","type":"boolean"},{"name":"created_by","type":"int"},{"name":"created_at","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"created_ip","type":"string"},{"name":"created_dept_id","type":"int"},{"name":"updated_by","type":["null","int"],"default":None},{"name":"updated_at","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":None},{"name":"updated_ip","type":["null","string"],"default":None},{"name":"updated_dept_id","type":["null","int"],"default":None},{"name":"deleted_by","type":["null","int"],"default":None},{"name":"deleted_at","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":None},{"name":"deleted_ip","type":["null","string"],"default":None},{"name":"deleted_dept_id","type":["null","int"],"default":None},{"name":"sql_id","type":"long"},{"name":"ipa_no","type":"string"},{"name":"pe_id","type":"long"},{"name":"uid","type":"string"},{"name":"mr_no","type":"string"},{"name":"site_id","type":"int"},{"name":"entered_date","type":{"type":"int","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Date","logicalType":"date"}}],"connect.name": "sample_a"}
    df.select(from_avro("value", json.dumps(jsonSchema)).alias("sample_a")) \
    .select("sample_a.*").writeStream.format("console").start()


    error:.

    org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:113)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 70516 out of bounds for length 2
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)

    Continue reading...

Compartilhe esta Página