We are inserting some data to CrateDB from Kafka using Kafka JDBC Sink from Confluent. We noticed that the ingestion stopped a few hours ago. When we ran the Kafka sink process manually, we noticed the following issue:
[2021-05-11 01:21:54,372] INFO [Consumer clientId=connector-consumer-cratedb-connector-netflow-0, groupId=connect-cratedb-connector-netflow] Resetting offset for partition NETFLOW_AVRO-0 to position FetchPosition{offset=17370324, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[questdb:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396)
[2021-05-11 01:22:06,170] INFO Attempting to open connection #1 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2021-05-11 01:22:06,428] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:56)
[2021-05-11 01:22:06,599] INFO Checking PostgreSql dialect for existence of TABLE "netflow" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:570)
[2021-05-11 01:22:06,621] INFO Using PostgreSql dialect TABLE "netflow" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:578)
[2021-05-11 01:22:06,782] INFO Checking PostgreSql dialect for type of TABLE "netflow" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:840)
[2021-05-11 01:22:06,803] INFO Setting metadata for table "netflow" to Table{name='"netflow"', type=TABLE columns=[Column{'AS_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'AS_DST', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'WRITER_ID', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IP_SRC', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TOS', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'EVENT_TYPE', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TIMESTAMP_END', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Column{'IP_DST', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IFACE_OUT', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'BYTES', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'PORT_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'PACKETS', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'TCP_FLAGS', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TAG', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'PORT_DST', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'LABEL', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IFACE_IN', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'IP_PROTO', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TIMESTAMP_START', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Column{'MONTH', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)
[2021-05-11 01:22:06,840] ERROR WorkerSinkTask{id=cratedb-connector-netflow-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Update count (0) did not sum up to total number of records inserted (126) (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.kafka.connect.errors.ConnectException: Update count (0) did not sum up to total number of records inserted (126)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:196)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-05-11 01:22:06,841] ERROR WorkerSinkTask{id=cratedb-connector-netflow-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Update count (0) did not sum up to total number of records inserted (126)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:196)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
[2021-05-11 01:22:06,842] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)
[2021-05-11 01:22:06,842] INFO Closing connection #1 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider:108)
[2021-05-11 01:22:06,843] INFO [Consumer clientId=connector-consumer-cratedb-connector-netflow-0, groupId=connect-cratedb-connector-netflow] Revoke previously assigned partitions NETFLOW_AVRO-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2021-05-11 01:22:06,843] INFO [Consumer clientId=connector-consumer-cratedb-connector-netflow-0, groupId=connect-cratedb-connector-netflow] Member connector-consumer-cratedb-connector-netflow-0-53fd209a-a4e3-4afe-b449-15f3d7f93ab8 sending LeaveGroup request to coordinator questdb:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
[2021-05-11 01:22:06,847] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-05-11 01:22:06,847] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-05-11 01:22:06,847] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-05-11 01:22:06,852] INFO App info kafka.consumer for connector-consumer-cratedb-connector-netflow-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
Some forum posts suggests that the records that are being inserting are duplicate records so it is a PK violation. They were able to overcome the issue by removing some records from the destination table. In our case, we have no PK on our table so something else must be going wrong.
We tried to check the crate.log but we are not seeing any feedback on the logs; not sure if we need to increase the severity or something.
Has anyone experienced this issue before?
Thanks