I have configured JSON as below
{
"name": "cratedb-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mytopic",
"connection.url":"jdbc:postgresql://localhost:5432/crate?user=crate",
"tasks.max": "4",
"insert.mode": "insert",
"table.name.format": "transactions",
"pk.mode": "kafka",
"pk.fields": "eventday,resourceid,id",
"schemas.enable":"false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"transforms":"addAuditDate",
"transforms.addAuditDate.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addAuditDate.timestamp.field":"AUDITDATE",
"transforms.addAuditDate.timestamp.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}
but it gives error like
[2024-08-02 06:45:05,004] ERROR [cratedb-connector|task-3] WorkerSinkTask{id=cratedb-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
[2024-08-02 06:45:05,004] ERROR [cratedb-connector|task-3] WorkerSinkTask{id=cratedb-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'cratedb-connector' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mytopic',partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
Any help appreciated.