Exception related to pk.mode and delete.enabled

I have configured JSON as below

{
  "name": "cratedb-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "mytopic",
    "connection.url":"jdbc:postgresql://localhost:5432/crate?user=crate",
    "tasks.max": "4",
    "insert.mode": "insert",
    "table.name.format": "transactions",
    "pk.mode": "kafka",
    "pk.fields": "eventday,resourceid,id",
    "schemas.enable":"false",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter.schemas.enable":"false",
    "transforms":"addAuditDate",
    "transforms.addAuditDate.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addAuditDate.timestamp.field":"AUDITDATE",
    "transforms.addAuditDate.timestamp.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
  }
}

but it gives error like

[2024-08-02 06:45:05,004] ERROR [cratedb-connector|task-3] WorkerSinkTask{id=cratedb-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema.
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
[2024-08-02 06:45:05,004] ERROR [cratedb-connector|task-3] WorkerSinkTask{id=cratedb-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema.
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

Any help appreciated.

1 Like

Hi @gorde.akshay, looks like this is related to Exception in JDBC sink connector - #6 by gorde.akshay

Let’s continue conversation there.