QlobberPG

Creates a new QlobberPG object for publishing and subscribing to a PostgreSQL queue.

new QlobberPG(options: Object)

Extends EventEmitter

Parameters
options (Object) Configures the PostgreSQL queue.
Name Description
options.name String Unique identifier for this QlobberPG instance. Every instance connected to the queue at the same time must have a different name.
options.db Object node-postgres configuration used for communicating with PostgreSQL.
options.single_ttl Integer (default 1h) Default time-to-live (in milliseconds) for messages which should be read by at most one subscriber. This value is added to the current time and the resulting expiry time is put into the message's database row. After the expiry time, the message is ignored and deleted when convenient.
options.multi_ttl Integer (default 1m) Default time-to-live (in milliseconds) for messages which can be read by many subscribers. This value is added to the current time and the resulting expiry time is put into the message's database row. After the expiry time, the message is ignored and deleted when convenient.
options.expire_interval Integer (default 10s) Number of milliseconds between deleting expired messages from the database.
options.poll_interval Integer (default 1s) Number of milliseconds between checking the database for new messages.
options.notify Boolean (default true) Whether to use a database trigger to watch for new messages. Note that this will be done in addition to polling the database every poll_interval milliseconds.
options.message_concurrency Integer (default 1) The number of messages to process at once.
options.handler_concurrency Integer (default 0) By default (0), a message is considered handled by a subscriber only when all its data has been read. If you set handler_concurrency to non-zero, a message is considered handled as soon as a subscriber receives it. The next message will then be processed straight away. The value of handler_concurrency limits the number of messages being handled by subscribers at any one time.
options.order_by_expiry Boolean (default false) Pass messages to subscribers in order of their expiry time.
options.dedup Boolean (default true) Whether to ensure each handler function is called at most once when a message is received.
options.single Boolean (default true) Whether to process messages meant for at most one subscriber (across all QlobberPG instances), i.e. work queues.
options.separator String (default '.') The character to use for separating words in message topics.
options.wildcard_one String (default '*') The character to use for matching exactly one word in a message topic to a subscriber.
options.wildcard_some String (default '#') The character to use for matching zero or more words in a message topic to a subscriber.
options.filter (Function | Array<Function>)? Function called before each message is processed.
  • The function signature is: (info, handlers, cb(err, ready, filtered_handlers))
  • You can use this to filter the subscribed handler functions to be called for the message (by passing the filtered list as the third argument to cb).
  • If you want to ignore the message at this time then pass false as the second argument to cb. options.filter will be called again later with the same message.
  • Defaults to a function which calls cb(null, true, handlers).
  • handlers is an ES6 Set, or array if options.dedup is falsey.
  • filtered_handlers should be an ES6 Set, or array if options.dedup is falsey. If not, new Set(filtered_handlers) or Array.from(filtered_handlers) will be used to convert it.
  • You can supply an array of filter functions - each will be called in turn with the filtered_handlers from the previous one.
  • An array containing the filter functions is also available as the filters property of the QlobberPG object and can be modified at any time.
options.batch_size Integer (default 100) Passed to https://github.com/brianc/node-pg-query-stream[`node-pg-query-stream`] and specifies how many messages to retrieve from the database at a time (using a cursor).
Instance Members
refresh_now()
force_refresh()
stop(cb?)
stop_watching(cb)
subscribe(topic, handler, options?, cb?)
unsubscribe(topic?, handler?, cb?)
publish(topic, payload, options, cb?)
Events
error
start
stop
warning