Is there any way to insert data from kafka topic to crate using kafka connect?

Is there any way to insert data from Kafka topic to the crate using Kafka connect?
Any help appreciated.

1 Like

Hi, We have this kafka-connect integration guide Data Ingestion using Kafka and Kafka Connect

1 Like

I have configured json file below

{
  "name": "cratedb-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "mytopic",
    "connection.url": "jdbc:crate://localhost:5432/gra?user=crate",
    "tasks.max": "4",
    "insert.mode": "insert",
    "table.name.format": "mytable",
    "pk.mode": "kafka",
    "pk.fields": "eventday,resourceid,id",
    "transforms": "toJSON,wrapValue",
    "transforms.toJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value",
    "transforms.toJSON.schemas.enable": "false",
    "transforms.wrapValue.type": "org.apache.kafka.connect.transforms.HoistField$Value",
    "transforms.wrapValue.field": "attributes",
    "transforms":"addAuditDate",
    "transforms.addAuditDate.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addAuditDate.timestamp.field":"AUDITDATE",
    "transforms.addAuditDate.timestamp.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
  }
}

but it gives exception

Caused by: org.apache.kafka.connect.errors.DataException:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.
If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Any help appreciated.

Maybe try

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

For key converter I also have seen

“key.converter”: “org.apache.kafka.connect.storage.StringConverter”