Hello folks:
Trying to ingest some data from a kafka topic to CrateDB using telegraf v1.22. All the pods are running in the same AKS. As a matter of fact, I was able do the same stuff for a topic that carries metric data in influx format.
Following is a sample message in the kafka topic:
{"SOURCE":"s_net_kafka","PROGRAM":"","PRIORITY":"debug","MESSAGE":"","LEGACY_MSGHDR":"","HOST_FROM":"aks-agentpool-38106399-vmss000000","HOSTNAME":"syslog-server-57dfc74f44-hgpcv","HOST":"aks-agentpool-38106399-vmss000000","FACILITY":"syslog","DATE":"Aug 25 14:35:37"}
I have the telegraf pod running in the same namespace as the kafka with:
- input plugin: kafka
- output plugin: Cratedb
I see that tlegraf created a table for the data but no messages are being ingested. I could not locate any useful logs in any of the Crate statefulset pods.
This is a lab system, and the output plugin config on telegraf is as follows:
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# Configuration for CrateDB to send metrics to.
[[outputs.cratedb]]
# A github.com/jackc/pgx/v4 connection string.
# See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
url = "postgres://crate@crate-external-service.crate/doc?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s"
# Name of the table to store metrics in.
table = "syslog"
# If true, and the metrics table does not exist, create it automatically.
table_create = true
# The character(s) to replace any '.' in an object key with
key_separator = "_"
Input plugin is as follows:
# Read metrics from Kafka topics
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["kafka-service:9092"]
## Topics to consume.
topics = ["syslog"]
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""
## Optional Client id
# client_id = "Telegraf"
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI (experimental)
# sasl_gssapi_service_name = ""
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
# sasl_gssapi_kerberos_config_path = "/"
# sasl_gssapi_realm = "realm"
# sasl_gssapi_key_tab_path = ""
# sasl_gssapi_disable_pafxfast = false
## used if sasl_mechanism is OAUTHBEARER (experimental)
# sasl_access_token = ""
## SASL protocol version. When connecting to Azure EventHub set to 0.
# sasl_version = 1
# Disable Kafka metadata full fetch
# metadata_full = false
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
offset = "newest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Maximum amount of time the consumer should take to process messages. If
## the debug log prints messages from sarama about 'abandoning subscription
## to [topic] because consuming was taking too long', increase this value to
## longer than the time taken by the output plugin(s).
##
## Note that the effective timeout could be between 'max_processing_time' and
## '2 * max_processing_time'.
# max_processing_time = "100ms"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
Is there a specific telegraf config that I should be following for this?
How does CrateDB know which message field is the timestamp?
Thanks