Exception in JDBC sink connector

I have configured json file below

{
  “name”: “cratedb-connector”,
  “config”: {
    “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
    “topics”: “mytopic”,
    “connection.url”: “jdbc:crate://localhost:5432/crate?user=crate”,
    “tasks.max”: “4”,
    “insert.mode”: “insert”,
    “table.name.format”: “mytable”,
    “pk.mode”: “kafka”,
    “pk.fields”: “eventday,resourceid,id”,
    “transforms”: “toJSON,wrapValue”,
    “transforms.toJSON.type”: “com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value”,
    “transforms.toJSON.schemas.enable”: “false”,
    “transforms.wrapValue.type”: “org.apache.kafka.connect.transforms.HoistField$Value”,
    “transforms.wrapValue.field”: “attributes”,
    “transforms”:“addAuditDate”,
    “transforms.addAuditDate.type”:“org.apache.kafka.connect.transforms.InsertField$Value”,
    “transforms.addAuditDate.timestamp.field”:“AUDITDATE”,
    “transforms.addAuditDate.timestamp.format”:“yyyy-MM-dd’T’HH:mm:ss’Z’”
  }
}

but it gives exception

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires “schema” and “payload” fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Any help appreciated.

1 Like

Could you try to use the postgres stock driver instead of the crate one? so "connection.url":“jdbc:postgres://localhost:5432/crate?user=crate”

Please take a look at the example here: cratedb-flink-jobs/src/main/java/io/crate/flink/demo/SimpleJdbcSinkJob.java at main · crate/cratedb-flink-jobs · GitHub

Additionally I found this open issue: JDBC Sink: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. · Issue #574 · confluentinc/kafka-connect-jdbc · GitHub which maybe relates?

1 Like

Hi matriv
As per your suggestion, I have changed the driver but it gives the below exception

[2024-08-02 05:51:26,785] ERROR [cratedb-connector|task-3] WorkerSinkTask{id=cratedb-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:postgres://localhost:5432/gra?user=crate
	at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgres://localhost:5432/gra?user=crate
	at java.sql/java.sql.DriverManager.getConnection(Unknown Source)
	at java.sql/java.sql.DriverManager.getConnection(Unknown Source)
	at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:256)
	at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:84)
	at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:54)
	... 13 more
1 Like

You would need to include the postgres jdbc driver in your build:
https://mvnrepository.com/artifact/org.postgresql/postgresql/42.7.3

1 Like

Thank you so much for your help.

2 Likes

Hi matriv

After adding postgresql-42.7.3.jar it throws the below exception

Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector ‘cratedb-connector’ is configured with ‘delete.enabled=false’ and ‘pk.mode=kafka’ and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic=‘Mytopic’,partition=0,offset=20,timestamp=1722577031564) with a HashMap value and null value schema.

Hi @gorde.akshay, could you please try adding

 "value.converter.schemas.enable": "true"

to your configuration?

HI, @Baur Already tried this, but the below error occurred.

Error encountered in task cratedb-connector-0. Executing stage ‘VALUE_CONVERTER’ with class ‘org.apache.kafka.connect.json.JsonConverter’, where consumed record is {topic=‘mytopic’, partition=0, offset=5, timestamp=1722920943610, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires “schema” and “payload” fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Due nature of our data we can not specify the schema.

Any help is appreciated.