Hello , I tried connect cratedb with kafka to get db pushed from MongoDB . This is my file json
ubuntu@ip-172-31-19-57:~$ cat kafka-crate-sink-1.json
{
"name": "crate-sink-113",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mongo.crm.geo",
"connection.url": "jdbc:postgresql://host:5432/CloudStore?stringtype=unspecified&tcpKeepAlive=true",
"connection.username": "username",
"connection.password": "password",
"consumer.override.auto.offset.reset": "earliest",
"insert.mode": "upsert",
"primary.key.mode": "none",
"auto.create": "true",
"auto.evolve": "true",
"transforms": "extractAfter,unwrapJson",
"transforms.extractAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAfter.field": "after",
"transforms.unwrapJson.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.unwrapJson.field": "payload",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
but it’s error
2025-08-14 05:54:13,284 ERROR || ERROR: Unrecognised Setting: TIMEZONE
io.crate.protocols.postgres.Messages.sendErrorResponse(Messages.java:189)
2025-08-14 05:54:16,357 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,544 ERROR || JDBC sink connector failure [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,544 ERROR || WorkerSinkTask{id=crate-sink-113-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,545 ERROR || WorkerSinkTask{id=crate-sink-113-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
how I can fix it ? This is key-value in mongo.crm.geo topic:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "mongo.crm.geo.Key"
},
"payload": {
"id": "\"KNA\""
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "before"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": true,
"field": "removedFields"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "updatedFields"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "field"
},
{
"type": "int32",
"optional": false,
"field": "size"
}
],
"optional": false,
"name": "io.debezium.connector.mongodb.changestream.truncatedarray",
"version": 1
},
"optional": true,
"field": "truncatedArrays"
}
],
"optional": true,
"name": "io.debezium.connector.mongodb.changestream.updatedescription",
"version": 1,
"field": "updateDescription"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "string",
"optional": true,
"field": "lsid"
},
{
"type": "int64",
"optional": true,
"field": "txnNumber"
},
{
"type": "int64",
"optional": true,
"field": "wallTime"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "mongo.crm.geo.Envelope"
},
"payload": {
"before": null,
"after": "{\"_id\": \"KNA\",\"GEO_TYPE_ID\": \"COUNTRY\",\"GEO_NAME\": \"Xanh Kít và Nêvít\",\"GEO_FULL_NAME\": \"Xanh Kít và Nêvít\",\"GEO_PLACE_TYPE_NAME\": \"Quốc gia\",\"GEO_NAME_LOCAL\": \"xanh kit va nevit\",\"GEO_CODE\": \"KNA\",\"VNNIC_ID\": \"KN\",\"STATUS_ID\": 1,\"CREATED_STAMP\": {\"$date\": 1741257345334},\"CREATED_TX_STAMP\": {\"$date\": 1741257345334},\"_class\": \"vn.longvan.core.model.Geo\"}",
"updateDescription": null,
"source": {
"version": "2.7.3.Final",
"connector": "mongodb",
"name": "mongo",
"ts_ms": 0,
"snapshot": "true",
"db": "crm",
"sequence": null,
"ts_us": 0,
"ts_ns": 0,
"collection": "geo",
"ord": -1,
"lsid": null,
"txnNumber": null,
"wallTime": null
},
"op": "r",
"ts_ms": 1755078735685,
"transaction": null
}
}
and I pushed mongo to kafka with debezium:
curl -k -X POST https://domain/connectors -H "Content-Type: application/json" -d '{
"name": "mongodb-source-crm",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://username:password@host_rs/?replicaSet=rs0&readPreference=secondary",
"mongodb.name": "mongo",
"database.include.list": "crm",
"collection.include.list": ".*",
"tasks.max": "1",
"topic.prefix": "mongo",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}'