Imhiredis: Redis input plugin

Module Name:

imhiredis

Author:

Jeremie Jourdin <jeremie.jourdin@advens.fr>

Contributors:

Theo Bertin <theo.bertin@advens.fr>

Purpose

Imhiredis is an input module reading arbitrary entries from Redis. It uses the hiredis library to query Redis instances using 3 modes:

Queue mode

The queue mode uses Redis LISTs to push/pop messages to/from lists. It allows simple and efficient uses of Redis as a queueing system, providing both LIFO and FIFO methods.

This mode should be preferred if the user wants to use Redis as a caching system, with one (or many) Rsyslog instances POP’ing out entries.

Warning

This mode was configured to provide optimal performances while not straining Redis, but as imhiredis has to poll the instance some trade-offs had to be made:

  • imhiredis POPs entries by batches of 10 to improve performances (size of batch is configurable via the batchsize parameter)

  • when no entries are left in the list, the module sleeps for 1 second before checking the list again. This means messages might be delayed by as much as 1 second between a push to the list and a pop by imhiredis (entries will still be POP’ed out as fast as possible while the list is not empty)

Channel mode

The subscribe mode uses Redis PUB/SUB system to listen to messages published to Redis’ channels. It allows performant use of Redis as a message broker.

This mode should be preferred to use Redis as a message broker, with zero, one or many subscribers listening to new messages.

Warning

This mode shouldn’t be used if messages are to be reliably processed, as messages published when no Imhiredis is listening will result in the loss of the message.

Stream mode

The stream mode uses Redis Streams system to read entries published to Redis’ streams. It is a good alternative when:
  • sharing work is desired

  • not losing any log (even in the case of a crash) is mandatory

This mode is especially useful to define pools of workers that do various processing along the way, while ensuring not a single log is lost during processing by a worker.

Note

As Redis streams do not insert simple values in keys, but rather fleid/value pairs, this mode can also be useful when handling structured data. This is better shown with the examples for the parameter fields.

This mode also adds additional internal metadata to the message, it won’t be included in json data or regular fields, but

  • $.redis!stream will be added to the message, with the value of the source stream

  • $.redis!index will be added to the message, with the exact ID of the entry

  • $.redis!group will be added in the message (if stream.consumerGroup is set), with the value of the group used to read the entry

  • $.redis!consumer will be added in the message (if stream.consumerName is set), with the value of the consumer name used to read the entry

This is especially useful when used with the omhiredis module, to allow it to get the required information semi-automatically (custom templates will still be required in the user configuration)

Warning

This mode is the most reliable to handle entries stored in Redis, but it might also be the one with the most overhead. Although still minimal, make sure to test the different options and determine if this mode is right for you!

Master/Replica

This module is able to automatically connect to the master instance of a master/replica(s) cluster. Simply providing a valid connection entry point (being the current master or a valid replica), Imhiredis is able to redirect to the master node on startup and when states change between nodes.

Configuration Parameters

Note

Parameter names are case-insensitive

Input Parameters

mode

type

default

mandatory

obsolete legacy directive

word

subscribe

yes

none

Defines the mode to use for the module.
Should be either “subscribe” (Channel mode), “queue” (Queue mode) or “stream” (Stream mode) (case-sensitive).

ruleset

type

default

mandatory

obsolete legacy directive

word

none

no

none

Assign messages from this input to a specific Rsyslog ruleset.

batchsize

type

default

mandatory

obsolete legacy directive

number

10

yes

none

Defines the dequeue batch size for redis pipelining. imhiredis will read “batchsize” elements from redis at a time.

When using the Queue mode, defines the size of the batch to use with LPOP / RPOP.

key

type

default

mandatory

obsolete legacy directive

word

none

yes

none

Defines either the name of the list to use (for Queue mode) or the channel to listen to (for Channel mode).

socketPath

type

default

mandatory

obsolete legacy directive

word

no

if no server provided

none

Defines the socket to use when trying to connect to Redis. Will be ignored if both server and socketPath are given.

server

type

default

mandatory

obsolete legacy directive

ip

127.0.0.1

if no socketPath provided

none

The Redis server’s IP to connect to.

port

type

default

mandatory

obsolete legacy directive

number

6379

no

none

The Redis server’s port to use when connecting via IP.

password

type

default

mandatory

obsolete legacy directive

word

none

no

none

The password to use when connecting to a Redis node, if necessary.

uselpop

type

default

mandatory

obsolete legacy directive

boolean

no

no

none

When using the Queue mode, defines if imhiredis should use a LPOP instruction instead of a RPOP (the default).
Has no influence on the Channel mode and will be ignored if set with this mode.

stream.consumerGroup

type

default

mandatory

obsolete legacy directive

word

no

none

When using the Stream mode, defines a consumer group name to use (see the XREADGROUP documentation for details). This parameter activates the use of XREADGROUP commands, in replacement to simple XREADs.
Has no influence in the other modes (queue or channel) and will be ignored.

Note

If this parameter is set, stream.consumerName should also be set

stream.consumerName

type

default

mandatory

obsolete legacy directive

word

no

none

When using the Stream mode, defines a consumer name to use (see the XREADGROUP documentation for details). This parameter activates the use of XREADGROUP commands, in replacement to simple XREADs.
Has no influence in the other modes (queue or channel) and will be ignored.

Note

If this parameter is set, stream.consumerGroup should also be set

stream.readFrom

type

default

mandatory

obsolete legacy directive

word

$

no

none

When using the Stream mode, defines the starting ID for XREAD/XREADGROUP commands (can also use special IDs, see documentation).
Has no influence in the other modes (queue or channel) and will be ignored.

stream.consumerACK

type

default

mandatory

obsolete legacy directive

boolean

on

no

none

When using Stream mode with stream.consumerGroup and stream.consumerName, determines if the module should directly acknowledge the ID once read from the Consumer Group.
Has no influence in the other modes (queue or channel) and will be ignored.

Note

When using Consumer Groups and imhiredis, omhiredis can also integrate with this workflow to acknowledge a processed message once put back in another stream (or somewhere else). This parameter is then useful set to off to let the omhiredis module acknowledge the input ID once the message is correctly sent.

stream.autoclaimIdleTime

type

default

mandatory

obsolete legacy directive

positive number

0

no

none

When using Stream mode with stream.consumerGroup and stream.consumerName, determines if the module should check for pending IDs that exceed this time (in milliseconds) to assume the original consumer failed to acknowledge the log and claim them for their own (see the redis ducumentation on this subject for more details on how that works).
Has no influence in the other modes (queue or channel) and will be ignored.

Note

If this parameter is set, the AUTOCLAIM operation will also take into account the specified stream.readFrom parameter. If its value is ‘$’ (default), the AUTOCLAIM commands will use ‘0-0’ as the starting ID.

fields

type

default

mandatory

obsolete legacy directive

array

[]

no

none

When using Stream mode, the module won’t get a simple entry but will instead get hashes, with field/value pairs.
By default, the module will insert every value into their respective field in the $! object, but this parameter can change this behaviour, for each entry the value will be a string where:
  • if the entry begins with a ! or a ., it will be taken as a key to take into the original entry

  • if the entry doesn’t begin with a ! or a ., the value will be taken verbatim

  • in addition, if the value is prefixed with a :<key>: pattern, the value (verbatim or taken from the entry) will be inserted in this specific key (or subkey)

Examples:

configuration

result

["static_value"]

the value “static_value” will be inserted in $!static_value

[":key:static_value"]

the value “static_value” will be inserted in $!key

["!field"]

the value of the field “field” will be inserted in $!field

[":key!subkey:!field"]

the value of the field “field” will be inserted in $!key!subkey

See also

Help with configuring/using Rsyslog:

See also

Contributing to Rsyslog:

Copyright 2008-2023 Rainer Gerhards (Großrinderfeld), and Others.