I have the Confluent JDBC Kafka Connector sinking data to CrateDB. It mostly works fine. I have the schema evolution enabled on the Kafka Connector. When I add a new field to my JSON, the schema registry is updated successfully and it is trying to update the table I have on CrateDB. My full table name is webservicetests.netflow. Following is the error I am getting in the JDBC connector output:
[2021-04-22 16:23:12,599] INFO Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] among column names [AS_SRC, AS_DST, WRITER_ID, IP_SRC, TOS, EVENT_TYPE, TIME STAMP_END, IP_DST, IFACE_OUT, BYTES, PORT_SRC, PACKETS, TCP_FLAGS, PORT_DST, IFACE_IN, IP_PROTO, TIMESTAMP_START, MONTH] (io.confluent.connect.jdbc.sink.DbStructure:273)
[2021-04-22 16:23:12,602] INFO Amending TABLE to add missing fields:[SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] maxRetries:10 with SQL: [ALTER TABLE "netflow" ADD "APPLICATI ON" TEXT NULL] (io.confluent.connect.jdbc.sink.DbStructure:199)
[2021-04-22 16:23:12,608] WARN Amend failed, re-attempting (io.confluent.connect.jdbc.sink.DbStructure:220)
org.postgresql.util.PSQLException: ERROR: line 1:46: extraneous input 'NULL' expecting {<EOF>, ';'}
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2510)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2245)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:311)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1189)
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:207)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2021-04-22 16:23:12,740] INFO Checking PostgreSql dialect for type of TABLE "netflow" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:840)
[2021-04-22 16:23:12,754] INFO Refreshing metadata for table "netflow" to Table{name='"netflow"', type=TABLE columns=[Column{'AS_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'AS_DST', isPrim aryKey=false, allowsNull=true, sqlType=int8}, Column{'WRITER_ID', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IP_SRC', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TOS', isP rimaryKey=false, allowsNull=true, sqlType=int8}, Column{'EVENT_TYPE', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TIMESTAMP_END', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Colu mn{'IP_DST', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IFACE_OUT', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'BYTES', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Co lumn{'PORT_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'PACKETS', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'TCP_FLAGS', isPrimaryKey=false, allowsNull=true, sqlType=varcha r}, Column{'PORT_DST', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'IFACE_IN', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'IP_PROTO', isPrimaryKey=false, allowsNull=true, sqlType= varchar}, Column{'TIMESTAMP_START', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Column{'MONTH', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}]} (io.confluent.connect.jdbc.util.TableDefini tions:86)
[2021-04-22 16:23:12,754] INFO Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] among column names [AS_SRC, AS_DST, WRITER_ID, IP_SRC, TOS, EVENT_TYPE, TIME STAMP_END, IP_DST, IFACE_OUT, BYTES, PORT_SRC, PACKETS, TCP_FLAGS, PORT_DST, IFACE_IN, IP_PROTO, TIMESTAMP_START, MONTH] (io.confluent.connect.jdbc.sink.DbStructure:273)
[2021-04-22 16:23:12,754] INFO Amending TABLE to add missing fields:[SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] maxRetries:9 with SQL: [ALTER TABLE "netflow" ADD "APPLICATIO N" TEXT NULL] (io.confluent.connect.jdbc.sink.DbStructure:199)
[2021-04-22 16:23:12,755] WARN Amend failed, re-attempting (io.confluent.connect.jdbc.sink.DbStructure:220)
org.postgresql.util.PSQLException: ERROR: line 1:46: extraneous input 'NULL' expecting {<EOF>, ';'}
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2510)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2245)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:311)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1189)
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:207)
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:223)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Can someone shed some light on this please. How come I am not able to ALTER my table on CrateDB.
Thanks,