How to detect queue overruns for persistent sessions?

I am trying to transfer data from a sensor to our backend over MQTT, and it should not lose data. Setup:

  • Self-hosted EMQX cluster, 3 nodes. Max queue size is the default 1000 messages.
  • One producer, let’s say it is just a counter publishing messages 1, 2, 3 etc, with an interval of 10 per second. The topic has QoS 1.
  • One consumer, that subscribes to the same topic. It’s a persistent session, with a long expiry interval (let’s say 3600 seconds).

Procedure:

  1. The producer is started, it creates the messages with increasing integers.
  2. The consumer is started for the first time, it picks up from the most recent integer (10).
  3. The consumer (or its connection) is interrupted, it does not consume data after 15.
  4. The producer keeps on publishing integers.
  5. The consumer is restarted much later when 2000 is published. It receives integers 1000 to 2000 immediately, but lacks 15 to 1000 because they were dropped out of the queue.

One exception are messages that were ‘in flight’: already sent but not yet acknowledged. These are the last few, so depending on the timing, the consumer may also receive 15, 16 and a few more.

Is there a way that the producer can be made aware that the broker is losing messages?

Hello,

Persistent sessions in MQTT are subscriber-centered. When a message is published to some topic, there is no difference, from the publisher’s perspective, if the topic has subscribers with or without persistent session, or if there are no subscribers at all. MQTT protocol doesn’t have any means of letting the publishers know what happens to the published messages on the subscriber side.
(Same as in Kafka, for example)

Perhaps the solution that you’re looking for, is creating a different topic where the subscriber can report progress its progress of consuming the data, that the publisher can monitor?

Thanks @dmif! That is indeed an interesting idea, to have the subscriber post its status (possibly using LWT) to a topic to notify publishers when they should slow down.

Hello @dmif

I have a follow-up question on this.
We have implemented this and it works for our custom solution, but it doesn’t work with some off-the-shelf hardware we are using.
We would like to be able to let off-the-shelf hardware buffer messages locally once messages are no longer being consumed from the broker. Is it possible to set up a system where we can auto-reject all messages sent to a certain topic (with wildcard)?
i.e. Can we auto-reject all messages sent to sensors/weather/# once the consumer is no longer consuming?
If not, is there a different solution that could work?

Can we auto-reject all messages sent to sensors/weather/# once the consumer is no longer consuming?

If you use MQTTv5, PUBACK contains reason code MQTT Version 5.0
EMQX will return reason code 16 (0x10) when there are no subscribers. The client can use this reason code to keep resending the messages. However, I don’t believe this can be done automatically by the client library. So, perhaps, this should be implemented by the business logic.

Thanks for the documentation. Unfortunately we can’t implement this in the business logic on part of our sensors. Is there a way for EMQX to return a different reason code if there are no subscribers? Maybe 0x80?

This may be possible with a custom plugin.

P.S. BTW, EMQX 5.7 will support durable persistent sessions. They don’t store messages in the mqueue, instead they will keep them on disk, so effectively there won’t be message drop due to mqueue overrun.