omkafka: write to Apache Kafka

Module Name:

omkafka

Author:

Rainer Gerhards <rgerhards@adiscon.com>

Available since:

v8.7.0

Purpose

The omkafka plug-in implements an Apache Kafka producer, permitting rsyslog to write data to Kafka.

Configuration Parameters

Note

Parameter names are case-insensitive; CamelCase is recommended for readability.

Action Parameters

Parameter

Summary

Broker

List of Kafka brokers in host:port form.

Topic

Kafka topic to produce to.

Key

Key used for partitioning messages.

DynaKey

Treat key as a template for dynamic partition keys.

DynaTopic

Treat topic as a template for dynamic topic names.

DynaTopic.Cachesize

Number of dynamic topics kept in cache.

Partitions.Auto

Use librdkafka’s automatic partitioning.

Partitions.number

Number of partitions to load-balance across.

Partitions.useFixed

Send all messages to a specific partition.

errorFile

Write failed messages to this JSON file.

statsFile

Write librdkafka statistics JSON to this file.

ConfParam

Arbitrary librdkafka producer options name=value.

TopicConfParam

Arbitrary librdkafka topic options name=value.

Template

Template used to format messages for this action.

closeTimeout

Milliseconds to wait for pending messages on shutdown.

resubmitOnFailure

Retry failed messages when Kafka becomes available.

KeepFailedMessages

Persist failed messages for resending after restart.

failedMsgFile

File that stores messages saved by KeepFailedMessages.

statsName

Name of statistics instance for this action.

Statistic Counter

This plugin maintains global statistics for omkafka that accumulate all action instances. The statistic origin is named “omafka” with following counters:

  • submitted - number of messages submitted to omkafka for processing (with both acknowledged deliveries to broker as well as failed or re-submitted from omkafka to librdkafka).

  • maxoutqsize - high water mark of output queue size.

  • failures - number of messages that librdkafka failed to deliver. This number is broken down into counts of various types of failures.

  • topicdynacache.skipped - count of dynamic topic cache lookups that find an existing topic and skip creating a new one.

  • topicdynacache.miss - count of dynamic topic cache lookups that fail to find an existing topic and end up creating new ones.

  • topicdynacache.evicted - count of dynamic topic cache entry evictions.

  • acked - count of messages that were acknowledged by kafka broker. Note that kafka broker provides two levels of delivery acknowledgements depending on topicConfParam: default (acks=1) implies delivery to the leader only while acks=-1 implies delivery to leader as well as replication to all brokers.

  • failures_msg_too_large - count of messages dropped by librdkafka when it failed to deliver to the broker because broker considers message to be too large. Note that omkafka may still resubmit to librdkafka depending on resubmitOnFailure option.

  • failures_unknown_topic - count of messages dropped by librdkafka when it failed to deliver to the broker because broker does not recognize the topic.

  • failures_queue_full - count of messages dropped by librdkafka when its queue becomes full. Note that default size of librdkafka queue is 100,000 messages.

  • failures_unknown_partition - count of messages that librdkafka failed to deliver because broker does not recognize a partition.

  • failures_other - count of all of the rest of the failures that do not fall in any of the above failure categories.

  • errors_timed_out - count of messages that librdkafka could not deliver within timeout. These errors will cause action to be suspended but messages can be retried depending on retry options.

  • errors_transport - count of messages that librdkafka could not deliver due to transport errors. These messages can be retried depending on retry options.

  • errors_broker_down - count of messages that librdkafka could not deliver because it thinks that broker is not accessible. These messages can be retried depending on options.

  • errors_auth - count of messages that librdkafka could not deliver due to authentication errors. These messages can be retried depending on the options.

  • errors_ssl - count of messages that librdkafka could not deliver due to ssl errors. These messages can be retried depending on the options.

  • errors_other - count of rest of librdkafka errors.

  • rtt_avg_usec - broker round trip time in microseconds averaged over all brokers. It is based on the statistics callback window specified through statistics.interval.ms parameter to librdkafka. Average exclude brokers with less than 100 microseconds rtt.

  • throttle_avg_msec - broker throttling time in milliseconds averaged over all brokers. This is also a part of window statistics delivered by librdkakfka. Average excludes brokers with zero throttling time.

  • int_latency_avg_usec - internal librdkafka producer queue latency in microseconds averaged other all brokers. This is also part of window statistics and average excludes brokers with zero internal latency.

Note that three window statics counters are not safe with multiple clients. When statistics callback is enabled, for example, by using statics.callback.ms=60000, omkafka will generate an internal log message every minute for the corresponding omkafka action:

2018-03-31T01:51:59.368491+00:00 app1-1.example.com rsyslogd: statscb_window_stats:
handler_name=collections.rsyslog.core#producer-1 replyq=0 msg_cnt=30 msg_size=37986 msg_max=100000
msg_size_max=1073741824 rtt_avg_usec=41475 throttle_avg_msec=0 int_latency_avg_usec=2943224 [v8.32.0]

For multiple actions using statistics callback, there will be one such record for each action after specified window period. See https://github.com/edenhill/librdkafka/wiki/Statistics for more details on statistics callback values.

Examples

Using Array Type Parameter

Set a single value

For example, to select “snappy” compression, you can use:

action(type="omkafka" topic="mytopic" confParam="compression.codec=snappy")

which is equivalent to:

action(type="omkafka" topic="mytopic" confParam=["compression.codec=snappy"])

Set multiple values

To specify multiple values, just use the bracket notation and create a comma-delimited list of values as shown here:

action(type="omkafka" topic="mytopic"
       confParam=["compression.codec=snappy",
                  "socket.timeout.ms=5",
                  "socket.keepalive.enable=true"]
      )

Set SASL password from an environment variable

Changed in version 8.2508.0: Backticks in RainerScript support the ${VAR} form and adjacent text. This enables a simpler inline configuration such as:

`echo sasl.password=${KAFKA_PASSWORD}`

Recommended (rsyslog v8.2508.0 and later)

Keep the secret out of rsyslog.conf and inject it via environment. Then build the key=value pair inline with backticks:

# set by your service manager or a secure env file
export KAFKA_PASSWORD='supersecret'
action(
  type="omkafka"
  broker=["kafka.example.com:9093"]
  confParam=[
    "security.protocol=SASL_SSL",
    "sasl.mechanism=SCRAM-SHA-512",
    "sasl.username=myuser",
    `echo sasl.password=${KAFKA_PASSWORD}`
  ]
)

Notes:

  • This relies on the enhanced backtick handling; it is not a general shell. Only the documented backtick subset (notably echo and cat) is supported.

  • The variable expansion happens at rsyslog parse time, using the process environment of the rsyslog daemon.

Older rsyslog versions (before v8.2508.0)

Backticks did not understand ${VAR} or adjacency. Inline forms like `` echo sasl.password=$KAFKA_PASSWORD `` could cause errors such as “missing equal sign in parameter”. Use a pre-composed environment variable that already contains the full key=value pair and echo that:

export KAFKA_PASSWORD='supersecret'
# Pre-compose the full key=value (done *outside* rsyslog)
export SASL_PWDPARAM="sasl.password=${KAFKA_PASSWORD}"
action(
  type="omkafka"
  broker=["kafka.example.com:9093"]
  confParam=[
    "security.protocol=SASL_SSL",
    "sasl.mechanism=SCRAM-SHA-512",
    "sasl.username=myuser",
    `echo $SASL_PWDPARAM`
  ]
)

Security guidance

  • Prefer environment files or service manager mechanisms with strict permissions over embedding secrets directly in rsyslog.conf.

  • Process environments may be visible to privileged users (e.g., via /proc); secure host access accordingly.


Support: rsyslog Assistant | GitHub Discussions | GitHub Issues: rsyslog source project

Contributing: Source & docs: rsyslog source project

© 2008–2025 Rainer Gerhards and others. Licensed under the Apache License 2.0.