omhiredis: Redis Output Module¶
Module Name: | omhiredis |
Author: | Brian Knox <bknox@digitalocean.com> |
Contributors: | Theo Bertin <theo.bertin@advens.fr> |
Purpose¶
This module provides native support for writing to Redis, using the hiredis client library.
Configuration Parameters¶
Note
Parameter names are case-insensitive.
Action Parameters¶
Server¶
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Name or address of the Redis server
ServerPort¶
type | default | mandatory |
|
---|---|---|---|
integer | 6379 | no | none |
Port of the Redis server if the server is not listening on the default port.
ServerPassword¶
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Password to support authenticated redis database server to push messages across networks and datacenters. Parameter is optional if not provided AUTH command wont be sent to the server.
Mode¶
type | default | mandatory |
|
---|---|---|---|
word | template | no | none |
Mode to run the output action in: “queue”, “publish”, “set” or “stream” If not supplied, the original “template” mode is used.
Note
Due to a config parsing bug in 8.13, explicitly setting this to “template” mode will result in a config parsing error.
If mode is “set”, omhiredis will send SET commands. If “expiration” parameter is provided (see parameter below), omhiredis will send SETEX commands.
If mode is “stream”, logs will be sent using XADD. In that case, the template-formatted message will be inserted in the msg field of the stream ID (this behaviour can be controlled through the stream.outField parameter)
Template¶
type | default | mandatory |
|
---|---|---|---|
word | RSYSLOG_ForwardFormat | no | none |
Warning
Template is required if using “template” mode.
Key¶
type | default | mandatory |
|
---|---|---|---|
word | none | no | none |
Key is required if using “publish”, “queue” or “set” modes.
Dynakey¶
type | default | mandatory |
|
---|---|---|---|
binary | off | no | none |
If set to “on”, the key value will be considered a template by Rsyslog. Useful when dynamic key generation is desired.
Userpush¶
type | default | mandatory |
|
---|---|---|---|
binary | off | no | none |
If set to on, use RPUSH instead of LPUSH, if not set or off, use LPUSH.
Expiration¶
type | default | mandatory |
|
---|---|---|---|
number | 0 | no | none |
Only applicable with mode “set”. Specifies an expiration for the keys set by omhiredis. If this parameter is not specified, the value will be 0 so keys will last forever, otherwise they will last for X seconds.
stream.outField¶
type | default | mandatory |
|
---|---|---|---|
word | msg | no | none |
Note
Currently, the module cannot use the full message object, so it can only insert templated messages to a single stream entry’s specific field
stream.capacityLimit¶
type | default | mandatory |
|
---|---|---|---|
positive integer | 0 | no | none |
Warning
This parameter has no way to check if the deleted entries have been ACK’ed once or even used, this should be set if you’re sure the insertion rate in lower that the dequeuing to avoid losing entries!
stream.ack¶
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.del¶
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.keyAck¶
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaKeyAck¶
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.groupAck¶
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaGroupAck¶
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
stream.indexAck¶
type | default | mandatory |
|
---|---|---|---|
word | no | none |
stream.dynaIndexAck¶
type | default | mandatory |
|
---|---|---|---|
boolean | off | no | none |
Examples¶
Example 1: Template mode¶
In “template” mode, the string constructed by the template is sent to Redis as a command.
Note
This mode has problems with strings with spaces in them - full message won’t work correctly. In this mode, the template argument is required, and the key argument is meaningless.
module(load="omhiredis")
template(
name="program_count_tmpl"
type="string"
string="HINCRBY progcount %programname% 1")
action(
name="count_programs"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="template"
template="program_count_tmpl")
Results¶
Here’s an example redis-cli session where we HGETALL the counts:
> redis-cli
127.0.0.1:6379> HGETALL progcount
1) "rsyslogd"
2) "35"
3) "rsyslogd-pstats"
4) "4302"
Example 2: Queue mode¶
In “queue” mode, the syslog message is pushed into a Redis list at “key”, using the LPUSH command. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="push_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="queue"
key="my_queue")
Results¶
Here’s an example redis-cli session where we RPOP from the queue:
> redis-cli
127.0.0.1:6379> RPOP my_queue
"<46>2015-09-17T10:54:50.080252-04:00 myhost rsyslogd: [origin software=\"rsyslogd\" swVersion=\"8.13.0.master\" x-pid=\"6452\" x-info=\"http://www.rsyslog.com\"] start"
127.0.0.1:6379>
Example 3: Publish mode¶
In “publish” mode, the syslog message is published to a Redis topic set by “key”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="publish_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="publish"
key="my_channel")
Results¶
Here’s an example redis-cli session where we SUBSCRIBE to the topic:
> redis-cli
127.0.0.1:6379> subscribe my_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "my_channel"
3) (integer) 1
1) "message"
2) "my_channel"
3) "<46>2015-09-17T10:55:44.486416-04:00 myhost rsyslogd-pstats: {\"name\":\"imuxsock\",\"origin\":\"imuxsock\",\"submitted\":0,\"ratelimit.discarded\":0,\"ratelimit.numratelimiters\":0}"
Example 4: Set mode¶
In “set” mode, the syslog message is set as a Redis key at “key”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="my_key")
Results¶
Here’s an example redis-cli session where we get the key:
> redis-cli
127.0.0.1:6379> get my_key
"<46>2019-12-17T20:16:54.781239+00:00 localhost rsyslogd-pstats: { \"name\": \"main Q\", \"origin\": \"core.queue\",
\"size\": 3, \"enqueued\": 7, \"full\": 0, \"discarded.full\": 0, \"discarded.nf\": 0, \"maxqsize\": 3 }"
127.0.0.1:6379> ttl my_key
(integer) -1
Example 5: Set mode with expiration¶
In “set” mode when “expiration” is set to a positive integer, the syslog message is set as a Redis key at “key”, with expiration “expiration”. If a template is not supplied, the plugin will default to the RSYSLOG_ForwardFormat template.
module(load="omhiredis")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="my_key"
expiration="10")
Results¶
Here’s an example redis-cli session where we get the key and test the expiration:
> redis-cli
127.0.0.1:6379> get my_key
"<46>2019-12-17T20:16:54.781239+00:00 localhost rsyslogd-pstats: { \"name\": \"main Q\", \"origin\": \"core.queue\",
\"size\": 3, \"enqueued\": 7, \"full\": 0, \"discarded.full\": 0, \"discarded.nf\": 0, \"maxqsize\": 3 }"
127.0.0.1:6379> ttl my_key
(integer) 10
127.0.0.1:6379> ttl my_key
(integer) 3
127.0.0.1:6379> ttl my_key
(integer) -2
127.0.0.1:6379> get my_key
(nil)
Example 6: Set mode with dynamic key¶
In any mode with “key” defined and “dynakey” as “on”, the key used during operation will be dynamically generated by Rsyslog using templating.
module(load="omhiredis")
template(name="example-template" type="string" string="%hostname%")
action(
name="set_redis"
server="my-redis-server.example.com"
serverport="6379"
type="omhiredis"
mode="set"
key="example-template"
dynakey="on")
Results¶
Here’s an example redis-cli session where we get the dynamic key:
> redis-cli
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
1) "localhost"
Example 7: “Simple” stream mode¶
module(load="omhiredis")
template(name="example-template" type="string" string="%hostname%")
action(
type="omhiredis"
server="my-redis-server.example.com"
serverport="6379"
template="example-template"
mode="stream"
key="stream_output"
stream.outField="data")
Results¶
Here’s an example redis-cli session where we get the newly inserted stream index:
> redis-cli
127.0.0.1:6379> XLEN stream_output
1
127.0.0.1:6379> xread STREAMS stream_output 0
1) 1) "stream_output"
2) 1) 1) "1684507855284-0"
2) 1) "data"
2) "localhost"
Example 8: Get from a stream with imhiredis, then insert in another one with omhiredis¶
module(load="imhiredis")
module(load="omhiredis")
template(name="redisJsonMessage" type="list") {
property(name="$!output")
}
template(name="indexTemplate" type="list") {
property(name="$.redis!index")
}
# Not used in this example, but can be used to replace the static declarations in omhiredis' configuration below
template(name="groupTemplate" type="list") {
property(name="$.redis!group")
}
template(name="keyTemplate" type="list") {
property(name="$.redis!stream")
}
input(type="imhiredis"
server="127.0.0.1"
port="6379"
mode="stream"
key="stream_input"
stream.consumerGroup="group1"
stream.consumerName="consumer1"
stream.consumerACK="off"
ruleset="receive_redis")
ruleset(name="receive_redis") {
action(type="omhiredis"
server="127.0.0.1"
serverport="6379"
mode="stream"
key="stream_output"
stream.ack="on"
# The key and group values are set statically, but the index value is taken from imhiredis metadata
stream.dynaKeyAck="off"
stream.keyAck="stream_input"
stream.dynaGroupAck="off"
stream.groupAck="group1"
stream.indexAck="indexTemplate"
stream.dynaIndexAck="on"
template="redisJsonMessage"
)
}
Results¶
Here’s an example redis-cli session where we get the pending entries at the end of the log re-insertion:
> redis-cli
127.0.0.1:6379> XINFO GROUPS stream_input
1) 1) "name"
1) "group1"
2) "consumers"
3) (integer) 1
4) "pending"
5) (integer) 0
6) "last-delivered-id"
7) "1684509391900-0"
8) "entries-read"
9) (integer) 1
10) "lag"
11) (integer) 0
Example 9: Ensuring streams don’t grow indefinitely¶
stream.del to delete processed entries (can also be used as a complement of ACK’ing)
stream.capacityLimit that allows to ensure a hard-limit of logs can be inserted before dropping entries
module(load="imhiredis")
module(load="omhiredis")
template(name="redisJsonMessage" type="list") {
property(name="$!output")
}
template(name="indexTemplate" type="list") {
property(name="$.redis!index")
}
template(name="keyTemplate" type="list") {
property(name="$.redis!stream")
}
input(type="imhiredis"
server="127.0.0.1"
port="6379"
mode="stream"
key="stream_input"
ruleset="receive_redis")
ruleset(name="receive_redis") {
action(type="omhiredis"
server="127.0.0.1"
serverport="6379"
mode="stream"
key="stream_output"
stream.capacityLimit="1000000"
stream.del="on"
stream.dynaKeyAck="on"
stream.keyAck="keyTemplate"
stream.dynaIndexAck="on"
stream.indexAck="indexTemplate"
template="redisJsonMessage"
)
}
Results¶
Here, the result of this configuration is:
entries are deleted from the source stream stream_input after being inserted by omhiredis to stream_output
stream_output won’t hold more than (approximately) a million entries at a time
Warning
The stream.capacityLimit is an approximate maximum! see redis documentation on MAXLEN and the ‘~’ option to understand how it works!
See also
Help with configuring/using Rsyslog
:
Mailing list - best route for general questions
GitHub: rsyslog source project - detailed questions, reporting issues that are believed to be bugs with
Rsyslog
See also
Contributing to Rsyslog
:
Source project: rsyslog project README.
Documentation: rsyslog-doc project README
Copyright 2008-2023 Rainer Gerhards (Großrinderfeld), and Others.