Kafka sink connector failed

Hello , I tried connect cratedb with kafka to get db pushed from MongoDB . This is my file json


ubuntu@ip-172-31-19-57:~$ cat kafka-crate-sink-1.json
{
  "name": "crate-sink-113",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "mongo.crm.geo",

    "connection.url": "jdbc:postgresql://host:5432/CloudStore?stringtype=unspecified&tcpKeepAlive=true",
    "connection.username": "username",
    "connection.password": "password",

    "consumer.override.auto.offset.reset": "earliest",
    "insert.mode": "upsert",
    "primary.key.mode": "none",

    "auto.create": "true",
    "auto.evolve": "true",

    "transforms": "extractAfter,unwrapJson",
    "transforms.extractAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractAfter.field": "after",

    "transforms.unwrapJson.type": "org.apache.kafka.connect.transforms.HoistField$Value",
    "transforms.unwrapJson.field": "payload",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true"
  }
}

but it’s error


2025-08-14 05:54:13,284 ERROR  ||  ERROR: Unrecognised Setting: TIMEZONE
io.crate.protocols.postgres.Messages.sendErrorResponse(Messages.java:189)
2025-08-14 05:54:16,357 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,544 ERROR  ||  JDBC sink connector failure   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,544 ERROR  ||  WorkerSinkTask{id=crate-sink-113-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema
2025-08-14 05:55:12,545 ERROR  ||  WorkerSinkTask{id=crate-sink-113-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

how I can fix it ? This is key-value in mongo.crm.geo topic:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "mongo.crm.geo.Key"
    },
    "payload": {
        "id": "\"KNA\""
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "before"
            },
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "optional": false
                        },
                        "optional": true,
                        "field": "removedFields"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Json",
                        "version": 1,
                        "field": "updatedFields"
                    },
                    {
                        "type": "array",
                        "items": {
                            "type": "struct",
                            "fields": [
                                {
                                    "type": "string",
                                    "optional": false,
                                    "field": "field"
                                },
                                {
                                    "type": "int32",
                                    "optional": false,
                                    "field": "size"
                                }
                            ],
                            "optional": false,
                            "name": "io.debezium.connector.mongodb.changestream.truncatedarray",
                            "version": 1
                        },
                        "optional": true,
                        "field": "truncatedArrays"
                    }
                ],
                "optional": true,
                "name": "io.debezium.connector.mongodb.changestream.updatedescription",
                "version": 1,
                "field": "updateDescription"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "collection"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ord"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "lsid"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txnNumber"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "wallTime"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mongo.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": true,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "mongo.crm.geo.Envelope"
    },
    "payload": {
        "before": null,
        "after": "{\"_id\": \"KNA\",\"GEO_TYPE_ID\": \"COUNTRY\",\"GEO_NAME\": \"Xanh Kít và Nêvít\",\"GEO_FULL_NAME\": \"Xanh Kít và Nêvít\",\"GEO_PLACE_TYPE_NAME\": \"Quốc gia\",\"GEO_NAME_LOCAL\": \"xanh kit va nevit\",\"GEO_CODE\": \"KNA\",\"VNNIC_ID\": \"KN\",\"STATUS_ID\": 1,\"CREATED_STAMP\": {\"$date\": 1741257345334},\"CREATED_TX_STAMP\": {\"$date\": 1741257345334},\"_class\": \"vn.longvan.core.model.Geo\"}",
        "updateDescription": null,
        "source": {
            "version": "2.7.3.Final",
            "connector": "mongodb",
            "name": "mongo",
            "ts_ms": 0,
            "snapshot": "true",
            "db": "crm",
            "sequence": null,
            "ts_us": 0,
            "ts_ns": 0,
            "collection": "geo",
            "ord": -1,
            "lsid": null,
            "txnNumber": null,
            "wallTime": null
        },
        "op": "r",
        "ts_ms": 1755078735685,
        "transaction": null
    }
} 

and I pushed mongo to kafka with debezium:

curl -k -X POST https://domain/connectors -H "Content-Type: application/json" -d '{
  "name": "mongodb-source-crm",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.connection.string": "mongodb://username:password@host_rs/?replicaSet=rs0&readPreference=secondary",
    "mongodb.name": "mongo",
    "database.include.list": "crm",
    "collection.include.list": ".*",
    "tasks.max": "1",
    "topic.prefix": "mongo",
    "snapshot.mode": "initial",
    "heartbeat.interval.ms": "10000",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
  }
}'

1 Like

Hi Mỹ Duyên,

thanks for writing in. I’ve asked a colleague about the problem you might be observing, and he had a quick suggestion:

My suspicion is they are sending records with an empty record body, i.e. the topic in Kafka has an empty JSON body?

Maybe this can help you already? Otherwise, we will need to have a more closer look later.

With kind regards,
Andreas.

I see that you are connecting using the vanilla pgJDBC driver, right? I am not sure about if this one would work, but the CrateDB JDBC Driver could possibly come to the rescue here if the problem is related to that in any way?

Hello amoth ,

I check data in topic after I use debezium to send data , I ensure that data same my before comment . It’s error because crateDB can’t understand format from data . Can you suggest me a config file of crateDB sink ?

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "before"
            },
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "optional": false
                        },
                        "optional": true,
                        "field": "removedFields"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Json",
                        "version": 1,
                        "field": "updatedFields"
                    },
                    {
                        "type": "array",
                        "items": {
                            "type": "struct",
                            "fields": [
                                {
                                    "type": "string",
                                    "optional": false,
                                    "field": "field"
                                },
                                {
                                    "type": "int32",
                                    "optional": false,
                                    "field": "size"
                                }
                            ],
                            "optional": false,
                            "name": "io.debezium.connector.mongodb.changestream.truncatedarray",
                            "version": 1
                        },
                        "optional": true,
                        "field": "truncatedArrays"
                    }
                ],
                "optional": true,
                "name": "io.debezium.connector.mongodb.changestream.updatedescription",
                "version": 1,
                "field": "updateDescription"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "collection"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ord"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "lsid"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txnNumber"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "wallTime"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mongo.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": true,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "mongo.crm.geo.Envelope"
    },
    "payload": {
        "before": null,
        "after": "{\"_id\": \"KNA\",\"GEO_TYPE_ID\": \"COUNTRY\",\"GEO_NAME\": \"Xanh Kít và Nêvít\",\"GEO_FULL_NAME\": \"Xanh Kít và Nêvít\",\"GEO_PLACE_TYPE_NAME\": \"Quốc gia\",\"GEO_NAME_LOCAL\": \"xanh kit va nevit\",\"GEO_CODE\": \"KNA\",\"VNNIC_ID\": \"KN\",\"STATUS_ID\": 1,\"CREATED_STAMP\": {\"$date\": 1741257345334},\"CREATED_TX_STAMP\": {\"$date\": 1741257345334},\"_class\": \"vn.longvan.core.model.Geo\"}",
        "updateDescription": null,
        "source": {
            "version": "2.7.3.Final",
            "connector": "mongodb",
            "name": "mongo",
            "ts_ms": 0,
            "snapshot": "true",
            "db": "crm",
            "sequence": null,
            "ts_us": 0,
            "ts_ns": 0,
            "collection": "geo",
            "ord": -1,
            "lsid": null,
            "txnNumber": null,
            "wallTime": null
        },
        "op": "r",
        "ts_ms": 1755078735685,
        "transaction": null
    }
}

I seen video demo kafka , debezium with mssql and cratedb , I took your team use postgresql jdbc so I use it but it’s not work

update: I use CrateDB JDBC , but I don’t know how to transform data to CrateDB . This is my file json :

curl -X POST https://kafka-dev.khacthienit.click/connectors \
-H "Content-Type: application/json" \
-d '{
  "name": "cratedb-sink-geo-oid-9",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "mongo.db.collection",

    "connection.url": "jdbc:crate://db_host/db",
    "connection.user": "username",
    "connection.password": "password",

    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "_id",
    "delete.enabled": "true",

    "transforms": "extractAfter,createKey",
    "transforms.extractAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractAfter.field": "after",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "_id",

    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}'

and it’s error

2025-08-15 10:31:08,447 ERROR  ||  Error encountered in task cratedb-sink-geo-oid-90-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.ValueToKey', where consumed record is {topic='mongo.db.collection', partition=0, offset=0, timestamp=1755250245141, timestampType=CreateTime}.   [org.apache.kafka.connect.runtime.errors.LogReporter]
2025-08-15 10:31:08,448 ERROR  ||  Error encountered in task cratedb-sink-geo-oid-90-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.ValueToKey', where consumed record is {topic='mongo.db.collection', partition=0, offset=1, timestamp=1755250245141, timestampType=CreateTime}.   [org.apache.kafka.connect.runtime.errors.LogReporter]

Hi again,

thanks for trying to make it work. Apologies that our Java experts are currently on vacation, but please don’t fret – we’ll see what we can do for you.

May we ask if you are strictly bound to use Kafka for relaying that data from MongoDB? Is there a chance you could directly tap into it? As you can see at Using CrateDB Toolkit to sync MongoDB with CrateDB, it is much more likely to make a difference faster on such teething problems.

Please let us know if using different technologies could be an option for you.

With kind regards,
Andreas.

but it’s error

Still, also back to analyzing your error report.

The error message Configured primary key mode 'record_value' cannot have null schema reads like it would be a configuration flaw with Kafka Connect and unrelated to CrateDB – is it possible?

^^ But wait, what is that?

That’s a good idea. Let us check our resources if we can find one. It’s an excellent opportunity to improve our documentation if it’s not easy to discover.

Please also understand that people are regularly struggling to use the Kafka JDBC sink pipeline element, maybe because its configuration is not the easiest and maybe because our documentation is suboptimal.

In this spirit, you are in excellent company, but still we’d like to apologize if lack of documentation on our end causes so many struggles, and of course we will try to improve. Based on yours and previous occasions, we should certainly enhance the canonical documentation pages about relevant matters?

I can see others might be struggling with similar details around timezones as well.

However, this particular message is originating at CrateDB in this case.

cr> select current_setting('TIMEZONE');
SQLParseException[Unrecognised Setting: TIMEZONE]

However, this spot is mostly ignored in most cases, so I guess we should focus on this error message again?

Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema

hiii , this solution only copy db , I want to update real-time db from mongo to crate ^^

yesss , I can’t fix this error this week :<<

^^ haha this is error when I using JDBC postgresql

Hi again,

So we’ve figured out you might be using Debezium for the job? Maybe ingredients from the other tutorial at Replicating data to CrateDB with Debezium and Kafka can support your struggles for a better outcome?

I am very sorry I am not a Debezium/Kafka expert, but as far as I can correlate your configuration shared here, in particular that fragment…

with the error message in focus…

Caused by: org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_value' cannot have null schema

and then compare it against this fragment from the other tutorial…

… we can see a deviation in the pk.mode configuration to refer to record_key instead of record_value like in the example blueprint? Is it possible that the cause for your problems are in that area of the destination connector configuration?

With kind regards,
Andreas.

hello , thanks you for your support, I wil try this solution in next Monday ^^ . Hope you have great day