Hello,
         
        I’ve integrated the paho.c library into one
          of my programs on Ubuntu 10.04LTS kernel 2.6 with the preempt
          patches, using it to connect to a mosquito broker running on
          the same machine. I am using the asynchronous API. To build
          the paho.c library, I cloned the git repository and built from
          source. I use the library to both subscribe and publish on
          topics with the broker.
         
        The paho library program frequently crashes
          with the error:
        pthread_mutex_lock.c
          : 62 : __pthread_mutex_lock: Assertion `mutex- __data.__owner
          = 0` Failed
         
        Does anyone have any experience with this
          error? My program has no threads or mutexes, so the error must
          be coming from Paho. I’ve tried various solutions to no avail.
          Sometime the program runs for 6 shours, sometimes 20 seconds.
          The relevant source code is below, which largely replicates
          the async examples.
         
        Any insight is appreciated. Thank you in
          advance.
         
        Tom
         
        #include <MQTTAsync.h>
         
        #define ADDRESS     "tcp://localhost:1883"
        #define CLIENTID    "MyClientId"
        #define QOS         1
        #define TIMEOUT     10000L
         
        static MQTTAsync client_;
        static BOOL isConnected_ = FALSE;
         
         
         
        void 
        mqTraceCallback(enum MQTTASYNC_TRACE_LEVELS
          level, char *message)
        {
        printf("TRACE:
          %s\n", message);
        }
         
        //
        // Start the client connection to the MQTT
          broker.
        //
        BOOL
        mqCoreLink_start(void)
        {
                        int rc;
                        MQTTAsync_connectOptions
          conn_opts = MQTTAsync_connectOptions_initializer;
                        
                        MQTTAsync_create(
                                       
          &client_, 
                                        ADDRESS, 
                                        CLIENTID, 
                                       
          MQTTCLIENT_PERSISTENCE_NONE,
          
                                        NULL );
                                        
                        MQTTAsync_setCallbacks(
                                        client_,
                          // client handle
                                        NULL,
                             // context (will be passed to every
          callback function)
                                        connlost,
                      // pointer to connection lost callback
                                       
          messageArrived,                              // pointer to
          message-received callback
                                        NULL
          );                 // pointer to delivery-complete callback
                                        
                        conn_opts.keepAliveInterval
          = 20;
                        conn_opts.cleansession = 1;
                        conn_opts._onSuccess_ =
          onConnect;
                        conn_opts._onFailure_ =
          onConnectFailure;
                        conn_opts.context =
          client_;
                        
                        rc =
          MQTTAsync_connect(client_, &conn_opts);
          
                        if ( rc !=
          MQTTASYNC_SUCCESS)
                        {
                                       
          printf("Failed to start connect, return code %d\n", rc);
                                        return
          FALSE;      
                        }
                        
                        //
                        // Create the default
          command interface.
                        // 
                       
          mqCoreLink_registerSignal("_command", NULL);
         
         
         
                        //
                        // Enable tracing
                        // If no callback is set
          (NULL), the trace values
                        // are not broadcast.
                        //
                        MQTTAsync_setTraceCallback
          (&mqTraceCallback);
                        // The default is minimum.
                       
          MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
                        
         
         
                        return TRUE;
        }
         
         
        void
        mqCoreLink_stop(void)
        {
                       
          MQTTAsync_destroy(&client_);
        }
         
         
        BOOL
        mqCoreLink_sendMessage(char* topic, char*
          msg, int msgLen)
        {
                        int rc;     
                        BOOL ret = FALSE;
                        MQTTAsync_responseOptions
          opts = MQTTAsync_responseOptions_initializer;
                        MQTTAsync_message pubmsg =
          MQTTAsync_message_initializer;
                                          
                        opts._onSuccess_ = onSend;
                        opts.context = client_;
                        pubmsg.payload = msg;
                        pubmsg.payloadlen = msgLen;
                        pubmsg.qos = QOS;
                        pubmsg.retained = 0;
                        deliveredtoken = 0;
         
                        rc =
          MQTTAsync_sendMessage(client_, topic, &pubmsg, &opts);
          
                        if ( rc !=
          MQTTASYNC_SUCCESS)
                        {
                                       
          printf("Failed to start sendMessage, \n"
                                                       
          "\t%s\n"
                                                       
          "\treturn code %d\n", topic, rc);
                                        goto quit;
                        }              
                        
                        
                        ret = TRUE;
                        
        quit:
                        return ret;
        }
         
         
         
        void 
        connlost(void *context, char *cause)
        {
                        MQTTAsync client =
          (MQTTAsync)context;
                        MQTTAsync_connectOptions
          conn_opts = MQTTAsync_connectOptions_initializer;
                        int rc;
                        
                        printf("\nConnection
          lost\n");
                        printf("     cause: %s\n",
          cause);
                        printf("Reconnecting\n");
                        
                        isConnected_ = FALSE;
                        
                        conn_opts.keepAliveInterval
          = 20;
                        conn_opts.cleansession = 1;
                        if ((rc =
          MQTTAsync_connect(client, &conn_opts)) !=
          MQTTASYNC_SUCCESS)
                        {
                                                       
          printf("Failed to start connect, return code %d\n", rc);
                        }
        }
         
         
        void 
        onDisconnect(void* context,
          MQTTAsync_successData* response)
        {
                        printf("Successful
          disconnection\n");
                        isConnected_ = FALSE;
        }
         
         
        void 
        onSend(void* context,
          MQTTAsync_successData* response)
        {
                        // printf("Message with
          token value %d delivery confirmed\n", response->token);
        }
         
         
        ////////////////////////////////////////////////////////////
        // Subscription callbacks.
        ////////////////////////////////////////////////////////////
         
        void 
        onSubscribe(void* context,
          MQTTAsync_successData* response)
        {
                        printf("Subscribe
          succeeded\n");
        }
         
         
        void 
        onSubscribeFailure(void* context,
          MQTTAsync_failureData* response)
        {
                        printf("Subscribe failed,
          rc %d\n", response ? response->code : 0);
        }
         
         
        ////////////////////////////////////////////////////////////
        // Connection Callbacks
        ////////////////////////////////////////////////////////////
         
         
        void 
        onConnectFailure(void* context,
          MQTTAsync_failureData* response)
        {
                        printf("Connect failed, rc
          %d\n", response ? response->code : 0);
                        isConnected_ = FALSE;
        }
         
         
         
        //
        // Called back from the paho mqtt library
          upon successful connection
        // to the MQTT broker. We subscribe all
          registered topic names at this
          
        // point.
        //
        void 
        onConnect(void* context,
          MQTTAsync_successData* response)
        {
        int topicCount;
                        int tmpCount;
                        
                        MQTTAsync client =
          (MQTTAsync)context;
                        MQTTAsync_responseOptions
          opts = MQTTAsync_responseOptions_initializer;
                        int rc;
                        int i;
                        
                        CoreLinkEvent*
          cle_elt;                
          
                        
                        isConnected_ = TRUE;
                        
                        topicCount = 0;
                       
          DL_FOREACH(coreLinkEvents_head_,cle_elt)
                        {
                                        if (
          cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
                                                       
          topicCount += 1;
                        }
                        
                        topicNames_ = alloca(sizeof
          *topicNames_ * topicCount);
                        qos_ = alloca( (topicCount
          + 1) * sizeof(int) );
         
                        for (i = 0; i <
          topicCount; ++i) 
                        {
                                       
          topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);
                        }
                        
         
                        //
                        // Copy in all registered
          topic names
                        //
                        tmpCount = 0;
                       
          DL_FOREACH(coreLinkEvents_head_,cle_elt)
                        {
                                        if (
          cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
                                        {
                                                       
          strncpy( 
          
                                                                       
          topicNames_[tmpCount],
          
                                                                       
          cle_elt->topic,
          
                                                                       
          CORELINK_MAXTOPICLEN );
                                                       
          tmpCount += 1;
                                        }
                        }
                        
                                        
                        //
                        // Make a quality of
          service array
                        //
                        for
          (i=0;i<topicCount;++i)
                        {
                                        qos_[i] =
          1;
                                       
          printf("subscribe: %s\n", &(*topicNames_[i]) );
                        }
                        
         
                        //
                        // Subscribe to all topics
                        //
         
                        printf("Successful
          connection\n");
                        printf("Subscribing to many
          topics..\n");
                        
                        opts._onSuccess_ =
          onSubscribe;
                        opts._onFailure_ =
          onSubscribeFailure;
                        opts.context = client;
         
                        rc =
          MQTTAsync_subscribeMany(
                                        client_, 
                                        topicCount,
          
                                       
          topicNames_, 
                                       
          &(qos_[0]), 
                                        &opts);
          
                                        
                        if ( rc !=
          MQTTASYNC_SUCCESS )
                        {
                                       
          printf("Failed to subscribe to topics! %d\n", rc);
                        }
        }
         
         
         
        //////////////////////////////////////////////////////////////////////////
        // Handle receiving messages from the MQTT
          broker
        // Generally, this means that a user
          interface running in a web-browser
        // has published to a topic listed in the
          cmdTopics[] array.
        //////////////////////////////////////////////////////////////////////////
         
        int 
        messageArrived(
                        void *context, char
          *topicName, int topicLen, MQTTAsync_message *message)
        {
        char* payloadptr;
                        int i;
                                        
                        
        printf("Message
          arrived\n");
        printf("    
          topic: %s\n", topicName);
        printf("  
          message: ");
        payloadptr =
          message->payload;
        for(i=0;
          i<message->payloadlen; i++)
        {
        putchar(*payloadptr++);
        }
        putchar('\n');
                        
         
        … 
         
        MQTTAsync_freeMessage(&message);
        MQTTAsync_free(topicName);
        return 1;
        }