omkafka: write to Apache Kafka¶
Module Name: |
omkafka |
Author: |
|
Available since: |
v8.7.0 |
Purpose¶
The omkafka plug-in implements an Apache Kafka producer, permitting rsyslog to write data to Kafka.
Configuration Parameters¶
Note
Parameter names are case-insensitive; CamelCase is recommended for readability.
Action Parameters¶
Parameter |
Summary |
---|---|
List of Kafka brokers in host:port form. |
|
Kafka topic to produce to. |
|
Key used for partitioning messages. |
|
Treat key as a template for dynamic partition keys. |
|
Treat topic as a template for dynamic topic names. |
|
Number of dynamic topics kept in cache. |
|
Use librdkafka’s automatic partitioning. |
|
Number of partitions to load-balance across. |
|
Send all messages to a specific partition. |
|
Write failed messages to this JSON file. |
|
Write librdkafka statistics JSON to this file. |
|
Arbitrary librdkafka producer options name=value. |
|
Arbitrary librdkafka topic options name=value. |
|
Template used to format messages for this action. |
|
Milliseconds to wait for pending messages on shutdown. |
|
Retry failed messages when Kafka becomes available. |
|
Persist failed messages for resending after restart. |
|
File that stores messages saved by KeepFailedMessages. |
|
Name of statistics instance for this action. |
Statistic Counter¶
This plugin maintains global statistics for omkafka that accumulate all action instances. The statistic origin is named “omafka” with following counters:
submitted - number of messages submitted to omkafka for processing (with both acknowledged deliveries to broker as well as failed or re-submitted from omkafka to librdkafka).
maxoutqsize - high water mark of output queue size.
failures - number of messages that librdkafka failed to deliver. This number is broken down into counts of various types of failures.
topicdynacache.skipped - count of dynamic topic cache lookups that find an existing topic and skip creating a new one.
topicdynacache.miss - count of dynamic topic cache lookups that fail to find an existing topic and end up creating new ones.
topicdynacache.evicted - count of dynamic topic cache entry evictions.
acked - count of messages that were acknowledged by kafka broker. Note that kafka broker provides two levels of delivery acknowledgements depending on topicConfParam: default (acks=1) implies delivery to the leader only while acks=-1 implies delivery to leader as well as replication to all brokers.
failures_msg_too_large - count of messages dropped by librdkafka when it failed to deliver to the broker because broker considers message to be too large. Note that omkafka may still resubmit to librdkafka depending on resubmitOnFailure option.
failures_unknown_topic - count of messages dropped by librdkafka when it failed to deliver to the broker because broker does not recognize the topic.
failures_queue_full - count of messages dropped by librdkafka when its queue becomes full. Note that default size of librdkafka queue is 100,000 messages.
failures_unknown_partition - count of messages that librdkafka failed to deliver because broker does not recognize a partition.
failures_other - count of all of the rest of the failures that do not fall in any of the above failure categories.
errors_timed_out - count of messages that librdkafka could not deliver within timeout. These errors will cause action to be suspended but messages can be retried depending on retry options.
errors_transport - count of messages that librdkafka could not deliver due to transport errors. These messages can be retried depending on retry options.
errors_broker_down - count of messages that librdkafka could not deliver because it thinks that broker is not accessible. These messages can be retried depending on options.
errors_auth - count of messages that librdkafka could not deliver due to authentication errors. These messages can be retried depending on the options.
errors_ssl - count of messages that librdkafka could not deliver due to ssl errors. These messages can be retried depending on the options.
errors_other - count of rest of librdkafka errors.
rtt_avg_usec - broker round trip time in microseconds averaged over all brokers. It is based on the statistics callback window specified through statistics.interval.ms parameter to librdkafka. Average exclude brokers with less than 100 microseconds rtt.
throttle_avg_msec - broker throttling time in milliseconds averaged over all brokers. This is also a part of window statistics delivered by librdkakfka. Average excludes brokers with zero throttling time.
int_latency_avg_usec - internal librdkafka producer queue latency in microseconds averaged other all brokers. This is also part of window statistics and average excludes brokers with zero internal latency.
Note that three window statics counters are not safe with multiple clients. When statistics callback is enabled, for example, by using statics.callback.ms=60000, omkafka will generate an internal log message every minute for the corresponding omkafka action:
2018-03-31T01:51:59.368491+00:00 app1-1.example.com rsyslogd: statscb_window_stats:
handler_name=collections.rsyslog.core#producer-1 replyq=0 msg_cnt=30 msg_size=37986 msg_max=100000
msg_size_max=1073741824 rtt_avg_usec=41475 throttle_avg_msec=0 int_latency_avg_usec=2943224 [v8.32.0]
For multiple actions using statistics callback, there will be one such record for each action after specified window period. See https://github.com/edenhill/librdkafka/wiki/Statistics for more details on statistics callback values.
Examples¶
Using Array Type Parameter¶
Set a single value¶
For example, to select “snappy” compression, you can use:
action(type="omkafka" topic="mytopic" confParam="compression.codec=snappy")
which is equivalent to:
action(type="omkafka" topic="mytopic" confParam=["compression.codec=snappy"])
Set multiple values¶
To specify multiple values, just use the bracket notation and create a comma-delimited list of values as shown here:
action(type="omkafka" topic="mytopic"
confParam=["compression.codec=snappy",
"socket.timeout.ms=5",
"socket.keepalive.enable=true"]
)
Set SASL password from an environment variable¶
Changed in version 8.2508.0: Backticks in RainerScript support the ${VAR}
form and adjacent text.
This enables a simpler inline configuration such as:
`echo sasl.password=${KAFKA_PASSWORD}`
Recommended (rsyslog v8.2508.0 and later)
Keep the secret out of rsyslog.conf and inject it via environment.
Then build the key=value
pair inline with backticks:
# set by your service manager or a secure env file
export KAFKA_PASSWORD='supersecret'
action(
type="omkafka"
broker=["kafka.example.com:9093"]
confParam=[
"security.protocol=SASL_SSL",
"sasl.mechanism=SCRAM-SHA-512",
"sasl.username=myuser",
`echo sasl.password=${KAFKA_PASSWORD}`
]
)
Notes:
This relies on the enhanced backtick handling; it is not a general shell. Only the documented backtick subset (notably
echo
andcat
) is supported.The variable expansion happens at rsyslog parse time, using the process environment of the rsyslog daemon.
Older rsyslog versions (before v8.2508.0)
Backticks did not understand ${VAR}
or adjacency. Inline forms like
`` echo sasl.password=$KAFKA_PASSWORD `` could cause errors such as
“missing equal sign in parameter”. Use a pre-composed environment variable that
already contains the full key=value
pair and echo that:
export KAFKA_PASSWORD='supersecret'
# Pre-compose the full key=value (done *outside* rsyslog)
export SASL_PWDPARAM="sasl.password=${KAFKA_PASSWORD}"
action(
type="omkafka"
broker=["kafka.example.com:9093"]
confParam=[
"security.protocol=SASL_SSL",
"sasl.mechanism=SCRAM-SHA-512",
"sasl.username=myuser",
`echo $SASL_PWDPARAM`
]
)
Security guidance¶
Prefer environment files or service manager mechanisms with strict permissions over embedding secrets directly in rsyslog.conf.
Process environments may be visible to privileged users (e.g., via
/proc
); secure host access accordingly.
Support: rsyslog Assistant | GitHub Discussions | GitHub Issues: rsyslog source project
Contributing: Source & docs: rsyslog source project
© 2008–2025 Rainer Gerhards and others. Licensed under the Apache License 2.0.