update: I use CrateDB JDBC , but I don’t know how to transform data to CrateDB . This is my file json :
curl -X POST https://kafka-dev.khacthienit.click/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "cratedb-sink-geo-oid-9",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mongo.db.collection",
"connection.url": "jdbc:crate://db_host/db",
"connection.user": "username",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "_id",
"delete.enabled": "true",
"transforms": "extractAfter,createKey",
"transforms.extractAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAfter.field": "after",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
and it’s error
2025-08-15 10:31:08,447 ERROR || Error encountered in task cratedb-sink-geo-oid-90-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.ValueToKey', where consumed record is {topic='mongo.db.collection', partition=0, offset=0, timestamp=1755250245141, timestampType=CreateTime}. [org.apache.kafka.connect.runtime.errors.LogReporter]
2025-08-15 10:31:08,448 ERROR || Error encountered in task cratedb-sink-geo-oid-90-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.ValueToKey', where consumed record is {topic='mongo.db.collection', partition=0, offset=1, timestamp=1755250245141, timestampType=CreateTime}. [org.apache.kafka.connect.runtime.errors.LogReporter]