Is there any way to insert data from Kafka topic to the crate using Kafka connect?
Any help appreciated.
1 Like
Hi, We have this kafka-connect integration guide Data Ingestion using Kafka and Kafka Connect
1 Like
I have configured json file below
{
"name": "cratedb-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mytopic",
"connection.url": "jdbc:crate://localhost:5432/gra?user=crate",
"tasks.max": "4",
"insert.mode": "insert",
"table.name.format": "mytable",
"pk.mode": "kafka",
"pk.fields": "eventday,resourceid,id",
"transforms": "toJSON,wrapValue",
"transforms.toJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value",
"transforms.toJSON.schemas.enable": "false",
"transforms.wrapValue.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.wrapValue.field": "attributes",
"transforms":"addAuditDate",
"transforms.addAuditDate.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addAuditDate.timestamp.field":"AUDITDATE",
"transforms.addAuditDate.timestamp.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}
but it gives exception
Caused by: org.apache.kafka.connect.errors.DataException:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.
If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Any help appreciated.
Maybe try
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
For key converter I also have seen
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”