Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [mosquitto-dev] Help needed, odd behaviour: What are the reasons the Broker will stop sending PUBLISH to a client?

Roger,

Just to give a feedback in this matter.

The problem:

Using max_inflight_messages=1, with subscriptions and publishes using
QOS=2, the broker will put a client queue status in wait for ever: all
messages of the stuck client
_were_ in `mosq_ms_queued` state and _were_ output messages.
If all messages are in the idle state (mosq_ms_queued), then it stops
publishing for the client in this scenario.


The workarround:

I don't know why all output messages got in the idle state, but I
could fix it with the patch below, it might not be the perfect fix,
but I'd like you to have a look at. I've been using this patch over 6
months by now.

Let me know what could I do to be of any assistance.
-- fanl


--- a/src/database.c
+++ b/src/database.c
@@ -803,9 +803,6 @@ int mqtt3_db_message_write(struct mosquitto_db
*db, struct mosquitto *context)

     tail = context->msgs;
     while(tail){
-        if(tail->direction == mosq_md_in){
-            msg_count++;
-        }
         if(tail->state != mosq_ms_queued){
             mid = tail->mid;
             retries = tail->dup;
@@ -814,6 +811,7 @@ int mqtt3_db_message_write(struct mosquitto_db
*db, struct mosquitto *context)
             qos = tail->qos;
             payloadlen = tail->store->payloadlen;
             payload = tail->store->payload;
+            msg_count++;

             switch(tail->state){
                 case mosq_ms_publish_qos0:
@@ -891,15 +889,17 @@ int mqtt3_db_message_write(struct mosquitto_db
*db, struct mosquitto *context)
             }
         }else{
             /* state == mosq_ms_queued */
-            if(tail->direction == mosq_md_in && (max_inflight == 0 ||
msg_count < max_inflight)){
-                if(tail->qos == 2){
-                    tail->state = mosq_ms_send_pubrec;
-                }
+            if(tail->direction == mosq_md_in && tail->qos == 2){
+                tail->state = mosq_ms_send_pubrec;
+            }else  if(tail->direction == mosq_md_out && tail->qos ==
2 && (max_inflight == 0 || msg_count < max_inflight)){
+                /* If the message is queued and is output, it must
get into activity as long max_inflight allows it */
+                tail->state = mosq_ms_publish_qos2;
             }else{
                 last = tail;
                 tail = tail->next;
             }
         }
     }

     return MOSQ_ERR_SUCCESS;


On Tue, Jul 7, 2015 at 9:11 PM, Felipe de Andrade Neves Lavratti
<felipelav@xxxxxxxxx> wrote:
> Well guys, my conclusion is this:
>
> I did reproduced the odd behavior while monitoring the Mosquitto
> process with gdb, and could see that all messages of the stuck client
> _were_ in `mosq_ms_queued` state and _ware_ output messages.
>
> If all messages in the queue of a client context is in
> `mosq_ms_queued` state and all of them are output, the client gets
> stuck because there's no code that will change the states in this
> condition, because, as far as I could understand, in Mosquitto, a
> message will only leave the `mosq_ms_queued` state when the broker
> receives a PUBREL from the client.
>
> So, for that reason, I created this patch that restores the activity
> of the message in the queue if it is an output as long it doesn't
> violate max_inflight.
>
> It has been proper tested on my problem and it went very well, didn't
> test any other case, so, testing is required if you feel so.
>
> What do you guys think of the proposed patch below?
>
> ----------
>
> diff --git a/src/database.c b/src/database.c
> index c984477..19ce3e2 100644
> --- a/src/database.c
> +++ b/src/database.c
> @@ -803,9 +803,6 @@ int mqtt3_db_message_write(struct mosquitto_db
> *db, struct mosquitto *context)
>
>         tail = context->msgs;
>         while(tail){
> -               if(tail->direction == mosq_md_in){
> -                       msg_count++;
> -               }
>                 if(tail->state != mosq_ms_queued){
>                         mid = tail->mid;
>                         retries = tail->dup;
> @@ -814,6 +811,7 @@ int mqtt3_db_message_write(struct mosquitto_db
> *db, struct mosquitto *context)
>                         qos = tail->qos;
>                         payloadlen = tail->store->payloadlen;
>                         payload = tail->store->payload;
> +                       msg_count++;
>
>                         switch(tail->state){
>                                 case mosq_ms_publish_qos0:
> @@ -891,15 +889,23 @@ int mqtt3_db_message_write(struct mosquitto_db
> *db, struct mosquitto *context)
>                         }
>                 }else{
>                         /* state == mosq_ms_queued */
> -                       if(tail->direction == mosq_md_in &&
> (max_inflight == 0 || msg_count < max_inflight)){
> +                       if(max_inflight == 0 || msg_count < max_inflight){
> +                               if(tail->direction == mosq_md_in){
>                                         if(tail->qos == 2){
>                                                 tail->state =
> mosq_ms_send_pubrec;
>                                         }
>                                 }else{
> +                                       /* If the message is queued
> and is output, it must get into activity as long max_inflight allows
> it */
> +                                       if(tail->qos == 2){
> +                                               tail->state =
> mosq_ms_publish_qos2;
> +                                       }
> +                               }
> +                       }else{
>                                 last = tail;
>                                 tail = tail->next;
>                         }
>                 }
> +
>         }
>
>         return MOSQ_ERR_SUCCESS;
>
> On Tue, Jul 7, 2015 at 7:31 PM, Felipe de Andrade Neves Lavratti
> <felipelav@xxxxxxxxx> wrote:
>> Sorry! I forgot to mention, I've already updated to 1.4.2. The last
>> log I e-mailed has run in the 1.4.2.
>>
>> On Tue, Jul 7, 2015 at 7:29 PM, Roger Light <roger@xxxxxxxxxx> wrote:
>>> Hi Felipe,
>>>
>>> I'll have to look at this again tomorrow night, but in the meantime
>>> please consider trying a more up to date version of mosquitto as well.
>>> There were 206 files changed, 5532 insertions(+), 2524 deletions(-)
>>> from some point after 1.3.5 and version 1.4.2 so you may be chasing a
>>> problem that isn't as bad as you think (there is still a problem for
>>> sure though).
>>>
>>> Cheers,
>>>
>>> Roger
>>>
>>>
>>>
>>> On Tue, Jul 7, 2015 at 11:20 PM, Felipe de Andrade Neves Lavratti
>>> <felipelav@xxxxxxxxx> wrote:
>>>> The client implementation is mine-ish, actually. The client runs on a
>>>> baremetal firmware (contiki) where there's no dynamic memory
>>>> allocation, that's one reason I need max inflight=1, the other is I
>>>> need order granteed.
>>>>
>>>> I changed the keep alive timer by myself, I'll test with 60s.
>>>>
>>>> I am not using client expiry. Non connection has been remade on tests,
>>>> I think I didn't get what you meant.
>>>>
>>>>
>>>> New info:
>>>>
>>>> I've just been debugging the mosquitto broker on the moment of the
>>>> detected odd behavior. The client context (struct mosquitto *) that
>>>> has stopped receiving PUBLISH, has all pending messages in the
>>>> `mosq_ms_queued` state. The point is that they won't leave that state.
>>>> Didn't find out why yet.
>>>>
>>>> On Tue, Jul 7, 2015 at 7:06 PM, Karl Palsson <karlp@xxxxxxxxxxxx> wrote:
>>>>> -----BEGIN PGP SIGNED MESSAGE-----
>>>>> Hash: SHA1
>>>>>
>>>>> Felipe de Andrade Neves Lavratti <felipelav@xxxxxxxxx> wrote:
>>>>>> Again,
>>>>>>
>>>>>> The problem is easy to replicate:
>>>>>>
>>>>>> The setup is this:
>>>>>> - I have 6 clients connected to the broker
>>>>>> - All clients are subscribed to the topic 'test'
>>>>>> - All clients reply on topic 'testout' for each message received in topic 'test'
>>>>>
>>>>> fwiw, you might find it easier to diagnose if you send a sequence
>>>>> instead of a fixed string.  I personally found the mid's to be rather
>>>>> odd in your earlier trace,
>>>>>
>>>>>> - All clients send PINGs each 20s
>>>>>> - A connection expires in 60s
>>>>>
>>>>> so, is keepalive 20sec, and you have private modified code then?  and
>>>>> you're still based on 1.3.x?  or something else?  note in your traces
>>>>> that d==0, so persistent client expiry is completely irrelevant, custom
>>>>> code or not.
>>>>>
>>>>>> - Every message is QOS2
>>>>>>
>>>>>> The test routine is this:
>>>>>> Publishes to the topic 'test' periodically in a frequency faster than
>>>>>> the clients can answer (on my slow 802.15.4 network I use 3 seconds)
>>>>>
>>>>> honestly I think your contiki setup is fundamentally broken, and you
>>>>> should chase this up there.  read the pinola paper just for starters,
>>>>> you should be looking at _maybe_ 250ms.  3 seconds is _insane_ and even
>>>>> if you hack mosquitto to make this particular test case pass, I think
>>>>> you will just see new problems down the road.  please consider Roger's
>>>>> points about (ab)using (or trying) client expiry to "fix" things that
>>>>> maybe need a different solution.
>>>>>
>>>>> Cheers,
>>>>> Karl P
>>>>>
>>>>>
>>>>>> until you realise that the broker will stop sending any publish to a
>>>>>> client until the client reconnects.
>>>>>>
>>>>>> It is feeling like a bug, what extra info you may need on this?
>>>>>>
>>>>>> Bellow another log of the problem.
>>>>>>
>>>>>> --------
>>>>>>
>>>>>>
>>>>>> **** Send a PUBLISH to everybody, including fe80::215:8d00:76:813f.
>>>>>>
>>>>>> 1436302152: Sending PUBLISH to aaaa::215:8d00:76:813a (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>> 1436302152: Sending PUBLISH to fe80::215:8d00:76:8140 (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>> 1436302152: Sending PUBLISH to fe80::215:8d00:76:813b (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>> 1436302152: Sending PUBLISH to fe80::215:8d00:76:8136 (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>> 1436302152: Sending PUBLISH to fe80::215:8d00:76:8139 (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>> 1436302152: Sending PUBLISH to fe80::215:8d00:76:813f (d0, q2, r0,
>>>>>> m46, 'test', ... (9 bytes))
>>>>>>
>>>>>
>>>>> -----BEGIN PGP SIGNATURE-----
>>>>> Version: GnuPG v1.4.11 (GNU/Linux)
>>>>>
>>>>> iQIcBAEBAgAGBQJVnE1PAAoJEBmotQ/U1cr2imEP/RPSa4BL3/6j+Rue8m02lGHV
>>>>> TEXoFCkbStWxAlq8hxA/8o48ok5kcr0cpnYE3/7HudXtRncC62PftNpcDcM1PsYZ
>>>>> 1ht2QIk5Xt5zrBQEd37Wi/h2ZJ/5XGaxI4Jxqr7Zz98LjfSR4Hrx+oRbogMZVtcI
>>>>> Tc2QQDZkdcN7C8ZyReRCQs5yIDABeBL3IqCxQ+ndFXQR/kcghm01nRtbq78qo5NR
>>>>> 9d7W7gRxDb5mzI/sR+ULf+pqeyFFfz77cme4hAtvg4cvd04oc8b/dfCk9gWATJce
>>>>> ECkVCWQl6kg+QDqW/v/bkkcGN5UJMlgSO0j74tFkQqno8cQydmb33Zt+3yW9t1wM
>>>>> lbxWBrihASoxx6PrvIFheXAJXDfz1hwKMqni6dyuQ1UGX8t09RyCd4s0oFg3WTQ1
>>>>> 15/mQC/JMKmrbwHp3y9BPJCA4/mda3lgf+wj5CZyVtc+dVyapaNf3qh3ZhqnDiIA
>>>>> J5RX+41dYTX1HEhPFAXLnaufShVP2MsSDvJ2U8U3yqUhfDVL6/9axV4Rr6KnW5Uu
>>>>> jfnk3nYsWg18yIsY5hkNTee2V2o/zVJYAJgypjruPPmm8vzQX6TdarICyR3BnKZd
>>>>> bT+fVNyT90OUST1d/SHR5QiWsTFauzuAwpnTfd4ZOfO7ITvCwPf520qk1yHV5Eaw
>>>>> YrzmZD9JqRHobfapgkhw
>>>>> =b6ZI
>>>>> -----END PGP SIGNATURE-----
>>>>>
>>>>> _______________________________________________
>>>>> mosquitto-dev mailing list
>>>>> mosquitto-dev@xxxxxxxxxxx
>>>>> To change your delivery options, retrieve your password, or unsubscribe from this list, visit
>>>>> https://dev.eclipse.org/mailman/listinfo/mosquitto-dev
>>>>
>>>>
>>>>
>>>> --
>>>> Skype: felipeanl
>>>> _______________________________________________
>>>> mosquitto-dev mailing list
>>>> mosquitto-dev@xxxxxxxxxxx
>>>> To change your delivery options, retrieve your password, or unsubscribe from this list, visit
>>>> https://dev.eclipse.org/mailman/listinfo/mosquitto-dev
>>> _______________________________________________
>>> mosquitto-dev mailing list
>>> mosquitto-dev@xxxxxxxxxxx
>>> To change your delivery options, retrieve your password, or unsubscribe from this list, visit
>>> https://dev.eclipse.org/mailman/listinfo/mosquitto-dev
>>
>>
>>
>> --
>> Skype: felipeanl
>
>
>
> --
> Skype: felipeanl



-- 
Skype: felipeanl


Back to the top