Shards of prometheus metrics table are in underreplicated state

We are using createdb-prometheus-adapter write data to CrateDB 5.6.2, while the shards of table partition are in underreplicated state. For details please see the below figure. We couldn’t find related solutions in community, any suggestion would be appreciated, thanks in advance.

The crate-infra.log shows.

[2024-02-28T10:03:07,265][WARN ][o.e.c.r.a.AllocationService] [t-cratedb-s-004] tionAction$AsyncReplicaAction.lambda$onResponse$0(TransportReplicationAction.java:561)
        at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:99)
        at org.elasticsearch.action.support.replication.TransportWriteAction$WriteReplicaResult$1.onSuccess(TransportWriteAction.java:207)
        at org.elasticsearch.action.support.replication.TransportWriteAction$AsyncAfterWriteAction.maybeFinish(TransportWriteAction.java:289)
        at org.elasticsearch.action.support.replication.TransportWriteAction$AsyncAfterWriteAction.run(TransportWriteAction.java:303)
        at org.elasticsearch.action.support.replication.TransportWriteAction$WriteReplicaResult.runPostReplicaActions(TransportWriteAction.java:214)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.onResponse(TransportReplicationAction.java:558)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.onResponse(TransportReplicationAction.java:536)
        at org.elasticsearch.index.shard.IndexShard.lambda$innerAcquireReplicaOperationPermit$27(IndexShard.java:2925)
        at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:127)
        at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:292)
        at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:242)
        at org.elasticsearch.index.shard.IndexShard.lambda$acquireReplicaOperationPermit$25(IndexShard.java:2860)
        at org.elasticsearch.index.shard.IndexShard.innerAcquireReplicaOperationPermit(IndexShard.java:2964)
        at org.elasticsearch.index.shard.IndexShard.acquireReplicaOperationPermit(IndexShard.java:2859)
        at org.elasticsearch.action.support.replication.TransportReplicationAction.acquireReplicaOperationPermit(TransportReplicationAction.java:881)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.doRun(TransportReplicationAction.java:629)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at org.elasticsearch.action.support.replication.TransportReplicationAction.handleReplicaRequest(TransportReplicationAction.java:521)
        at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:59)
        at org.elasticsearch.transport.InboundHandler$RequestHandler.doRun(InboundHandler.java:331)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.lang.Thread.run(Thread.java:1583)
Caused by: [.partitioned.metrics.04732dpg74o3ee1k60o30c1g/OKwmMNp7T1uLjDcGj1wDag][[.partitioned.metrics.04732dpg74o3ee1k60o30c1g][3]] org.elasticsearch.index.translog.TranslogException: Failed to write operation [Index{id='Aw0xNzA5MDg1Mjg2MzMxEGI0ZWJkYWMxNzFkZTlkYjUNMTcwOTA3ODQwMDAwMA==', seqNo=-2, primaryTerm=0, version=-3, autoGeneratedIdTimestamp=-1}]
        at org.elasticsearch.index.translog.Translog.add(Translog.java:526)
        at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:941)
        at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:806)
        at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:778)
        at io.crate.execution.dml.upsert.TransportShardUpsertAction.processRequestItemsOnReplica(TransportShardUpsertAction.java:448)
        at io.crate.execution.dml.upsert.TransportShardUpsertAction.processRequestItemsOnReplica(TransportShardUpsertAction.java:94)
        at io.crate.execution.dml.TransportShardAction$2.call(TransportShardAction.java:119)
        at io.crate.execution.dml.TransportShardAction$2.call(TransportShardAction.java:116)
        at io.crate.execution.dml.TransportShardAction.wrapOperationInKillable(TransportShardAction.java:130)
        at io.crate.execution.dml.TransportShardAction.shardOperationOnReplica(TransportShardAction.java:122)
        at io.crate.execution.dml.TransportShardAction.shardOperationOnReplica(TransportShardAction.java:53)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.onResponse(TransportReplicationAction.java:557)
        ... 18 more
Caused by: java.lang.IllegalArgumentException: sequence number must be assigned
        at org.elasticsearch.index.seqno.SequenceNumbers.min(SequenceNumbers.java:90)
        at org.elasticsearch.index.translog.TranslogWriter.add(TranslogWriter.java:210)
        at org.elasticsearch.index.translog.Translog.add(Translog.java:519)
        ... 29 more
], allocation_status[no_attempt]], expected_shard_size[314811200]], markAsStale [true], failure [org.elasticsearch.transport.RemoteTransportException: [t-cratedb-s-005][10.10.22.215:4300][internal:crate:sql/data/write[r]]
Caused by: org.apache.lucene.store.AlreadyClosedException: [.partitioned.metrics.04732dpg74o3ee1k60o30c1g][3] engine is closed

Hi @Jun_Zhou, thangs for reporting.

Just to quickly check one vector of probable issues: do you do dynamic schema updates, ie do you create new columns/sub-columns by either

  1. inserting unknown columns into table with column_policy = dynamic
  2. inserting unknown columns into OBJECT with column_policy = dynamic (default)?

Hi @Baur , I create metrics table with below DDL. The column labels is object type in dynamic and the column_policy of table metrics is strict that is default value.

CREATE TABLE "metrics" (
    "timestamp" TIMESTAMP,
    "labels_hash" STRING,
    "labels" OBJECT(DYNAMIC),
    "value" DOUBLE,
    "valueRaw" LONG,
    "day__generated" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"),
    PRIMARY KEY ("timestamp", "labels_hash", "day__generated")
  ) PARTITIONED BY ("day__generated");

Thanks for the update.

  1. How many nodes do you have?

  2. How often do you run update/insert statements including values of the “labels” column? If possible could you please share an example of the exact statement?

  3. Could you show output of

select node['name'], size, partition_ident,recovery, state, routing_state, seq_no_stats, translog_stats, flush_stats from sys.shards sh where sh.table_name = 'metrics'?

  1. There are five nodes.
  2. the data are come from prometheus by the cratedb-prometheus-adpater.
  3. Please see the attachement.
    shards.csv.txt (40.4 KB)
1 Like

Thanks for the update.

On 2 - do you use stock version of the adapter or some custom modification?

Asking since I saw Prometheus failed to write data into CrateDB after encountering error 'A document with the same primary key exists already' - #2 by proddata with a suggestion to change query to INSERT ON CONFLICT

I just modify the code to support upsert, the issue maybe related to this modification.

INSERT INTO metrics ("labels", "labels_hash", "timestamp", "value", "valueRaw") VALUES ($1, $2, $3, $4, $5)
ON CONFLICT ("timestamp", "labels_hash", "day__generated")
DO UPDATE SET
   "value" = excluded."value",
   "valueRaw" = excluded."valueRaw";

Hi, thanks for the update.

I tried imitating inserting records with PK conflicts and updating query to the ... ON CONFLICT version but couldn’t reproduce locally.

If you have self contained reproduction (steps to reproduce or a script) please let us know.