Skip to main content

Queuing

Queuing is useful when you need to allow retries or throttle the number of messages being processed at a time. The Queue can be configured in three different places in a channel config.

  • In the Source. This will queue all messages coming in from the source. This will allow for a quick ack to the sender that the message was queued without having to wait for the ingestion flow to process the messages up to an ack flow. Ack flows in the ingestion array will not be sent back to the original sender when using a queue.
  • In the Route. This will queue the message before it is sent to the flows of the route. This could be useful if you want to throttle the number of messages being sent to a specific route or if a transformer, filter, store, or destination flow is problematic to allow for retries of the entire route again.
  • In RouteFlows. Typing currently allows queues to be added to any RouteFlow, but only TCP RouteFlows will queue the messages. This is useful if you want to throttle the number of messages being sent to a specific destination or to retry the TCP connection again in case of downtime or other transport failure.

NOTE: TCP destinations that return a NACK do not currently retry the message. This could be added in the future if there is a need for it. In most of the systems I have worked with, the NACK is a permanent failure and the message should be discarded.

The interface QueueConfig is defined as:

interface QueueConfig {
kind: 'queue'
filo: boolean // default to false
retries?: number // defaults to Infinity
id?: (msg: Msg) => string // default to crypto.randomUUID()
concurrent?: number // defaults to 1
maxTimeout?: number // defaults to 10x1000 = 10 seconds
afterProcessDelay?: number // default to 1000 = 1 second
rotate?: boolean // defaults to false
verbose?: boolean // defaults to false
store: 'file' | 'memory'
stringify?: (msg: Msg) => string // defaults to (msg) => msg.toString()
parse?: (msg: string) => Msg // defaults to (msg) => new Msg(msg)
}

The filo option when set to true, reverses the queue order to First-In-Last-Out instead of the default First-In-First-Out.

The retries option is the number of times to retry the message before discarding it. The default is Infinity which will retry the message forever.

The id option is a function that takes the message and returns a unique string. This is used to identify the message in the queue. The default is crypto.randomUUID() which is a cryptographically secure random number generator. Alternatively, you could use the message id from the MSH segment with (msg) => msg.get('MSH-10.1'). This is useful to prevent duplicate messages in the queue simultaneously. But if a duplicate id is used, and the previous message has already been processed, the new message will be processed as well.

The concurrent option is the number of messages to process at a time. The default is 1 which will process one message at a time. This is useful if you want to throttle the number of messages being processed at a time. If you want a faster throughput, you can increase this number, but you will most likely experience message reordering. To ensure message order, you can only use a concurrent value of 1.

The maxTimeout option is the maximum amount of time to wait for a message to be processed before retrying. This is implemented currently by the queueing class, but not yet implemented in the actual flows.

The afterProcessDelay option was initially defined as the amount of time to wait after a message has been processed before processing the next message. But that is not how it is currently actually implemented. This option currently sets how long between each poll of the queue worker. For example, if this was set as 1 minute and the last message started a minute ago, but just finished processing after 90 seconds, then the poll at the 1-minute mark would have returned due to “still processing”, but it would poll again at the 2-minute mark. So this is not technically the amount of time to wait after a message has been processed, but rather the amount of time to wait between each poll of the queue worker.

The rotate option does not preserve the order of the messages. When a message fails and is requeued it will be placed at the end of the queue. This is useful if don’t care about the order of the messages and also don’t want a single failed message to block the entire queue.

The verbose option is useful for debugging. It will log the queue events to the console.

The store option is the type of store to use for the queue. Currently, only file and memory are supported. The file store will persist the queue to disk in the OS temp directory. The memory store will keep the queue in memory.

info

If you are using the file store, you can stop the server and restart it and the queue will be restored.

danger

If you are using the memory store, you will lose the queue if you stop/restart the server/channel/process.

The stringify and parse options are used to convert the message to a string and back to a message. The default is to use the toString() and new Msg() methods. If you want the queue to store the JSON representation of the HL7 message, you can use:

stringify: (msg) => JSON.stringify(msg.raw()),
parse: (msg) => new Msg(JSON.parse(msg))
info

NOTE: At _this time OOP-style channel configs do not support queuing.