Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [paho-dev] Multiple Messages on subscribing


Le 21/06/2017 à 15:04, Michael Post a écrit :
> Hello,
> with paho-mqtt i publish 10656 Datasets (Messages) in a loop to
> mosquitto with qos level 2.
> Through the iteration i count the messages and print out the total at
> the end.
> Parallel i proceed the messages via paho-mqtt subscribing (level qos:
> 1). In my tests my client proceed currently about more than 19742 times
> and i do not know why.
"Parallel" ? You means that you have a published and in parallel ONE
client ? If not see later.
> Do you have any clue why this can be happen?
> I expect the behaviour that one message will be send, one client will
> process the message one time and good.
I fear that there is a misunderstood of MQTT protocol here.
If you have 2 clients that both subscribe to the same topic, both
clients will receive all messages sent to that topic.

If you want to shard the load over multiple client, with MQTT I think
the only solutions are:

* using multiple brokers and your publisher(s) must spreed messages over
all brokers. Then have one consumer per broker.
* use multiple topics and publisher(s) speed the messages over all topics
* (slightly less optimal) keep one topic, but when message is received
the consumer decide to process or ignore the message based on a sharding
policy. This seems trickier to implement and still cause all consumers
to receive all messages (but hopefully do a light processing for most of

> Here is my source for the client (python)
> [...]
> # This is the MQTT Broker onDisconnect
> def on_disconnect(client, userdata, rc):
>     while rc != 0:
>         print ("Unexpected MQTT disconnection. Will auto-reconnect")
>         rc = client.reconnect()
I don't think is the best place to reconnect. When using loop_start or
loop_forever paho will reconnect by itself if you get disconnected.
I think it's better to handle reconnection when calling the loop below

> # On every incoming message
> def on_message(client, userdata, msg):
> [...]
>     run = True
>     while run:
>         client.loop()
>         #print("Loop()")
> except KeyboardInterrupt:
>     # Close DBConnection
>     dbConnection.close()
>     pass
Here the best would probably to use loop_forever(). In fact
loop_forever() will terminate if you call disconnect(). (e.g. if
something change run to False in your code, you could also call
client.disconnect() which could terminate loop_forever())

If you stay with manually calling loop(), the reconnect should be better
here, in case loop() returned an error MQTT_ERR_CONN_LOST.


Back to the top