We are having some issues using the JDBC Kafka Sink from Confluent to insert some records into CrateDB. We have this dataset with a few dimensions and a couple of those dimensions are formatted as JSON. So we created our table on CrateDB as follows:
CREATE TABLE webservicetests.test(
"ID" STRING,
"TIMESTAMP" TIMESTAMP WITHOUT TIME ZONE,
"IP" STRING,
"ISPINFO" OBJECT,
"EXTRA" OBJECT,
"UA" STRING,
"LANG" STRING,
"DL" REAL,
"UL" REAL,
"PING" REAL,
"JITTER" REAL,
"LOG" STRING,
"MONTH" TIMESTAMP WITHOUT TIME ZONE GENERATED ALWAYS AS date_trunc('month', "TIMESTAMP")
)
PARTITIONED BY ("MONTH") WITH (column_policy = 'dynamic');
ISPINFO and EXTRA columns are formatted in json. When the JDBC connector tries to push this, it pushes these two columns as json and we get an error from the CrateDB side saying unknown column format. Following is the error message from the connector side:
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test" ("ID","TIMESTAMP","IP","ISPINFO","EXTRA","UA","LANG","DL","UL","PING","JITTER","LOG") VALUES (23,1619789224920,'23.23.23.23','{"processedString":"23.23.23.23 - Data Centers Inc., CA (1060 km)","rawIspInfo":{"ip":"23.23.23.23","city":"Montréal","region":"Quebec","country":"CA","loc":"23.568,-73.5878","org":"Data Centers Inc.","postal":"H2W","timezone":"America/Toronto","readme":"https://ipinfo.io/missingauth"}}'::json,'{"server":"test_machine"}'::json,'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36','en-US,en;q=0.9',42.37,8.43,69.64,25.52,'') was aborted: ERROR: Cannot find data type: json Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: Cannot find data type: json
org.postgresql.util.PSQLException: ERROR: Cannot find data type: json
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Do we need to change the way we create the table? Maybe use a different column type?
We tried to execute the same INSERT command manually by substituting the ::json with ::object and it worked. Is there a workaround for this?