Kafka connect don't sink data to cratedb after adding more than 100 topics

Hi Team, we are using CrateDB 5.10.2 and using kafka connect to sink data to CrateDB, the firstly created the connect could running normally and create automatically nineteen tables in CrateDB. After adding more topics to kafka connect, the connect no more sink data to CrateDB, the consume lag is increasing, while there are no any errors in connect.log and crate.log.

I appreciate the help, thanks.

connect configuration

{
	"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
	"table.name.format": "${topic}",
	"transforms.dropPrefix.replacement": "$2.$3",
	"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
	"transforms": "dropPrefix,flatten,Cast",
	"transforms.dropPrefix.regex": "([^.]+).([^.]+).([^.]+)",
	"topics.regex": "pgdb_biz.so.([^.]+),pgdb_biz.qto.([^.]+),pgdb_biz.dock.([^.]+)",
	"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
	"transforms.flatten.delimiter": "_",
	"value.converter.schema.registry.url": "http://<schema-registry-host-1>:18081,http://<schema-registry-host-2>:18081,http://<schema-registry-host-3>:18081",
	"delete.enabled": "false",
	"auto.evolve": "true",
	"name": "sink-pgdb_biz_to_cratedb-connector",
	"auto.create": "true",
	"transforms.Cast.spec": "before_req_date:string,before_plan_delivery_date:string,after_req_date:string,after_plan_delivery_date:string,before_order_date:string,after_order_date:string",
	"connection.url": "jdbc:postgresql://<database-host>:<port>/<database-name>?user=<username>&password=<password>",
	"value.converter": "io.confluent.connect.avro.AvroConverter",
	"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
	"insert.mode": "insert",
	"key.converter": "io.confluent.connect.avro.AvroConverter",
	"key.converter.schema.registry.url": "http://<schema-registry-host-1>:18081,http://<schema-registry-host-2>:18081,http://<schema-registry-host-3>:18081"
}

below tables are created when connect firstly created

cr> \dt
+----------------------------------+
| name                             |
+----------------------------------+
| dock.so_header                   |
| dock.so_line                     |
| qto.qo_header                    |
| qto.qo_line                      |
| so.so_comm_audit_req_ext         |
| so.so_header                     |
| so.so_header_ext                 |
| so.so_header_invoice_ext         |
| so.so_line                       |
| so.so_line_mark_detail           |
| so.so_line_patent_ext            |
| so.so_line_priority_prepare_data |
| so.so_line_relation_detail       |
| so.so_line_sub_item              |
| so.so_line_trace_ext             |
| so.so_rel_contact_detail         |
| so.state_machine_def             |
| so.state_machine_task            |
| so.state_machine_task_log        |
+----------------------------------+
SELECT 19 rows in set (0.004 sec)
cr>

consume lag


Hi @Jun_Zhou,

This is not easy to diagnose asynchronously.
A couple of things to take a look at:

  • Are there still any INSERT statements coming through at all? (SELECT DATE_TRUNC('minute', started), COUNT(*) FROM sys.jobs_log WHERE classification['type'] = 'INSERT' GROUP BY 1 ORDER BY 1 DESC)
  • Do you see any failed INSERT statements in the jobs log (SELECT * FROM sys.jobs_log WHERE error IS NOT NULL ORDER BY started DESC)?
  • Is the CrateDB cluster healthy? (SELECT * FROM sys.node_checks WHERE passed = FALSE)

Best
Niklas

@hammerhead , I have checked those things you mentioned. Although there are insert error statements, those are written by Prometheus, not Kafka Connect and the cluster is healthy.

cr> SELECT DATE_TRUNC('minute', started), COUNT(*) FROM sys.jobs_log WHERE classification['type'] = 'INSERT' GROUP BY 1 ORDER BY 1 DESC;
+-------------------------------+----------+
| date_trunc('minute', started) | count(*) |
+-------------------------------+----------+
|                 1742777880000 |      245 |
|                 1742777820000 |      284 |
|                 1742777760000 |      280 |
|                 1742777700000 |      283 |
|                 1742777640000 |      283 |
|                 1742777580000 |      283 |
|                 1742777520000 |      283 |
|                 1742777460000 |      285 |
|                 1742777400000 |      280 |
|                 1742777340000 |      283 |
|                 1742777280000 |      280 |
|                 1742777220000 |      282 |
|                 1742777160000 |      279 |
|                 1742777100000 |      284 |
|                 1742777040000 |      284 |
|                 1742776980000 |      278 |
|                 1742776920000 |      281 |
|                 1742776860000 |      281 |
|                 1742776800000 |      277 |
|                 1742776740000 |      282 |
|                 1742776680000 |      284 |
|                 1742776620000 |      279 |
|                 1742776560000 |      282 |
|                 1742776500000 |      284 |
|                 1742776440000 |      287 |
|                 1742776380000 |      278 |
|                 1742776320000 |      281 |
|                 1742776260000 |      281 |
|                 1742776200000 |      289 |
|                 1742776140000 |      289 |
|                 1742776080000 |      284 |
|                 1742776020000 |      286 |
|                 1742775960000 |       18 |
+-------------------------------+----------+
SELECT 33 rows in set (0.071 sec)
cr> select * from sys.jobs_log where stmt not like '%metrics%' and error IS NOT NULL ORDER BY started DESC;
+-------------------------------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+----------------------------------------------+----------+
| classification                      |         ended | error                                                                                                                               | id                                   | node                                                         |       started | stmt                                         | username |
+-------------------------------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+----------------------------------------------+----------+
| {"labels": [], "type": "UNDEFINED"} | 1742268900512 | line 1:6: mismatched input '"6d3b23c4-1e5e-a8d9-f18d-5f85fdcaf849"' expecting {'ALL', '?', '$', STRING, BEGIN_DOLLAR_QUOTED_STRING} | 55e347d2-7a65-4395-a667-59d4f2de7f8e | {"id": "04dtyt5LRfiGggwsJBwOVg", "name": "t-infra-tsdb-002"} | 1742268900512 | kill "6d3b23c4-1e5e-a8d9-f18d-5f85fdcaf849"; | crate    |
+-------------------------------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+----------------------------------------------+----------+
SELECT 1 row in set (0.170 sec)

cr> select * from sys.jobs_log where stmt like '%INSERT%' and error IS NOT NULL ORDER BY started DESC;
+-------------------------------------+---------------+---------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------+----------+
| classification                      |         ended | error                                                   | id                                   | node                                                         |       started | stmt                                                                                                                                  | username |
+-------------------------------------+---------------+---------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------+----------+
| {"labels": [], "type": "UNDEFINED"} | 1742777515958 | line 1:132: extraneous input ')' expecting {<EOF>, ';'} | 1fad472d-4436-1d3e-652b-51ee63be95aa | {"id": "8wGqpC2ITQmOTwp0Ldo_DQ", "name": "t-infra-tsdb-001"} | 1742777515958 | SELECT DATE_TRUNC('minute', started), COUNT(*) FROM sys.jobs_log WHERE classification['type'] = 'INSERT' GROUP BY 1 ORDER BY 1 DESC); | crate    |
+-------------------------------------+---------------+---------------------------------------------------------+--------------------------------------+--------------------------------------------------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------+----------+
SELECT 1 row in set (0.134 sec)
cr> SELECT * FROM sys.node_checks WHERE passed = FALSE;
+--------------+-------------+----+---------+--------+----------+
| acknowledged | description | id | node_id | passed | severity |
+--------------+-------------+----+---------+--------+----------+
+--------------+-------------+----+---------+--------+----------+
SELECT 0 rows in set (0.006 sec)

@hammerhead I have identified the root cause of the issue: the topics.regex parameter does not support multiple regular expressions. It requires using a single regex pattern to match multiple topics. Thanks again.

1 Like