.. _dev-action-threads: .. index:: single: queues; worker threads single: queues; batching single: queues; backpressure single: queues; suspend/resume Action Threads and Queue Engine (Developer-Oriented) ==================================================== .. page-summary-start Developer deep-dive into rsyslog’s action queues, worker threads, batching, backpressure, and suspend/resume behavior. .. page-summary-end Overview -------- The rsyslog queue engine connects producers (inputs/rulesets) with consumers (actions), providing durability, concurrency, backpressure, and error handling. Each action may use a dedicated queue (or direct mode) and one or more worker threads. The engine balances throughput and ordering while coping with failures and shutdown. User-facing queue concepts and parameters are covered in :doc:`../concepts/queues` and :doc:`../rainerscript/queue_parameters`. Terminology ----------- - **Producer**: Component that enqueues a message (main queue, ruleset, or action caller). - **Action**: Output/processing step (e.g., omfwd, omfile). - **Action queue**: The queue attached to an action (or direct mode without an explicit queue). - **Worker thread**: Thread that dequeues batches, executes the action, and finalizes results. - **Batch**: Group of messages processed as a unit (see ``queue.dequeueBatchSize``). - **Backpressure**: Throttling behavior when queue reaches high-water marks. - **Retry/Suspend**: Temporary action pause with backoff when the action fails. - **DA mode**: Disk-assisted in-memory queue (memory primary, spills to disk on pressure). - **Disk-only queue**: Persistent queue on disk for maximum durability. Thread and Queue Topology ------------------------- - Main message queue feeds rulesets. Rulesets evaluate messages and dispatch to actions. - For each action: - **direct** mode: No queue; messages are synchronously handed to the action. - **memory/disk/DA** queue: Messages enter an action queue processed by N workers. - Worker threads scale per action (``queue.workerThreads``) and dequeue in batches. .. mermaid:: flowchart LR Inputs[Input modules] --> MainQ[Main Message Queue]; MainQ --> Rules[Ruleset Processor]; Rules --> A1Q[Action Queue A1]; Rules --> A2Q[Action Queue A2]; A1Q --> A1W[Workers A1]; A2Q --> A2W[Workers A2]; A1W --> A1Out["Action A1: omfile"]; A2W --> A2Out["Action A2: omfwd"]; Concurrency & Locking --------------------- - The framework may run **multiple workers per action**. - ``wrkrInstanceData_t`` (WID) is **per-worker**; do not share it; no locks needed inside WID. - Shared per-action state (``pData``/``instanceData``) **must be protected by the module** (mutex/rwlock). - **Direct** mode does **not** remove the need to serialize inherently serial resources in the module. Message Processing Lifecycle (Action Queue) ------------------------------------------- 1. Enqueue - Producer pushes message to the action queue. - DA/disk queues persist as configured; memory queues hold in RAM. 2. Worker activation - Idle worker threads wake when items are available. - Thread count respects ``queue.workerThreads`` (and internal min/max). 3. Dequeue and batch - Worker dequeues up to ``queue.dequeueBatchSize`` messages (not less than ``queue.minDequeueBatchSize`` unless draining or shutting down). - Dequeue is FIFO; end-to-end emission may reorder with multiple workers. 4. Execute action - Worker invokes the action module once per batch or per message depending on the action. - Action may signal success, transient failure (retry), or fatal failure (discard or suspend). 5. Commit/finalize - On success, messages are acknowledged and removed (and persisted offsets advanced). - On transient failure, worker applies backoff/suspend/retry logic; messages remain pending. - On fatal failure, behavior follows queue/action discard policy. 6. Idle/scale-down - If the queue is drained, workers may sleep and be reclaimed after timeouts. 7. Shutdown - On shutdown, workers attempt to drain within ``queue.timeoutShutdown`` and honor persistence settings (e.g., ``queue.saveOnShutdown`` for DA/disk). Backpressure and Flow Control ----------------------------- - **Watermarks**: - High-water mark: When reached, producers slow down or block per timeout settings. - Low-water mark: Normal operation resumes below this threshold. - **Discard policy**: - When near/at capacity, use ``queue.discardMark`` and ``queue.discardSeverity`` to shed lower-priority messages first (if configured). - **Timeouts**: - ``queue.timeoutEnqueue``: Producer blocking time when queue is full. - ``queue.timeoutActionCompletion``: How long a worker waits for the action to complete. - **Rate limiting**: - Configurable in some actions to avoid overloading downstreams. .. mermaid:: stateDiagram-v2 [*] --> Empty Empty --> Filling: enqueue Filling --> High: size >= highWatermark High --> Draining: dequeue/batches Draining --> Filling: enqueue > dequeue Draining --> Empty: size == 0 High --> Filling: size < highWatermark Error Handling, Retry, Suspend ------------------------------ - Transient errors trigger backoff: - Worker suspends the action for a short interval and retries later (interval may grow). - Persistent errors: - Depending on module and settings, move to dead-letter semantics, drop, or keep retrying. - Disk-backed safety: - DA and disk queues keep messages across process restarts (subject to sync and checkpoint settings). Queue Types and Selection ------------------------- - **direct**: Lowest latency, no buffering; action must keep up or it becomes a bottleneck. - **in-memory**: High throughput, volatile; messages lost on crash unless DA or disk-backed. - **disk-assisted in-memory (DA)**: Fast under normal load, durable under bursts. - **disk-only**: Highest durability, higher latency; best for critical delivery. Key Parameters (see :doc:`../rainerscript/queue_parameters`) ------------------------------------------------------------ - ``queue.type``: direct, LinkedList (memory), FixedArray (memory), disk, (DA via memory-queue with ``queue.filename`` set). - ``queue.size``: Capacity in number of messages (memory queues). - ``queue.dequeueBatchSize`` / ``queue.minDequeueBatchSize``: Batch sizing. - ``queue.workerThreads``: Max concurrent workers per action. - ``queue.highWatermark`` / ``queue.lowWatermark``: Backpressure thresholds. - ``discardMark`` / ``discardSeverity``: Controlled shedding under pressure. - ``queue.spoolDirectory`` / ``queue.filename``: Disk storage for DA/disk queues. - ``queue.checkpointInterval`` / ``queue.syncQueueFiles``: Durability and fsync policy. - ``queue.timeoutEnqueue`` / ``queue.timeoutShutdown`` / ``queue.timeoutActionCompletion``: Timing behavior. - ``queue.saveOnShutdown``: Persist pending entries at shutdown (DA/disk). Sequence and Error Paths ------------------------ .. mermaid:: sequenceDiagram actor P as Producer participant Q as Action Queue participant W as Worker Thread participant A as Action Module P->>Q: enqueue(msg) Q-->>W: wake W->>Q: dequeue(batch) W->>A: process(batch) A-->>W: success | transient_error | fatal_error alt success W->>Q: commit/remove(batch) else transient_error W->>W: backoff/suspend (retry later) else fatal_error W->>Q: discard or DLQ policy end W-->>Q: next batch or sleep Developer Notes --------------- - Batching improves throughput but increases per-message latency; tune batch sizes per action characteristics. - Parallel workers can reorder across batches; per-queue FIFO is preserved, but global ordering is not. - Avoid blocking in action code; prefer non-blocking I/O and internal buffering where possible. - Ensure action modules clearly communicate transient vs. permanent errors to the engine. Cross-References ---------------- - :doc:`../concepts/queues` - :doc:`../rainerscript/queue_parameters` - :doc:`../whitepapers/queues_analogy`