Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[paho-dev] pthread_mutex_lock

Hello,

 

I’ve integrated the paho.c library into one of my programs on Ubuntu 10.04LTS kernel 2.6 with the preempt patches, using it to connect to a mosquito broker running on the same machine. I am using the asynchronous API. To build the paho.c library, I cloned the git repository and built from source. I use the library to both subscribe and publish on topics with the broker.

 

The paho library program frequently crashes with the error:

pthread_mutex_lock.c : 62 : __pthread_mutex_lock: Assertion `mutex- __data.__owner = 0` Failed

 

Does anyone have any experience with this error? My program has no threads or mutexes, so the error must be coming from Paho. I’ve tried various solutions to no avail. Sometime the program runs for 6 shours, sometimes 20 seconds. The relevant source code is below, which largely replicates the async examples.

 

Any insight is appreciated. Thank you in advance.

 

Tom

 

#include <MQTTAsync.h>

 

#define ADDRESS     "tcp://localhost:1883"

#define CLIENTID    "MyClientId"

#define QOS         1

#define TIMEOUT     10000L

 

static MQTTAsync client_;

static BOOL isConnected_ = FALSE;

 

 

 

void

mqTraceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)

{

printf("TRACE: %s\n", message);

}

 

//

// Start the client connection to the MQTT broker.

//

BOOL

mqCoreLink_start(void)

{

                int rc;

                MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

               

                MQTTAsync_create(

                                &client_,

                                ADDRESS,

                                CLIENTID,

                                MQTTCLIENT_PERSISTENCE_NONE,

                                NULL );

                               

                MQTTAsync_setCallbacks(

                                client_,                 // client handle

                                NULL,                    // context (will be passed to every callback function)

                                connlost,             // pointer to connection lost callback

                                messageArrived,                              // pointer to message-received callback

                                NULL );                 // pointer to delivery-complete callback

                               

                conn_opts.keepAliveInterval = 20;

                conn_opts.cleansession = 1;

                conn_opts._onSuccess_ = onConnect;

                conn_opts._onFailure_ = onConnectFailure;

                conn_opts.context = client_;

               

                rc = MQTTAsync_connect(client_, &conn_opts);

                if ( rc != MQTTASYNC_SUCCESS)

                {

                                printf("Failed to start connect, return code %d\n", rc);

                                return FALSE;     

                }

               

                //

                // Create the default command interface.

                //

                mqCoreLink_registerSignal("_command", NULL);

 

 

 

                //

                // Enable tracing

                // If no callback is set (NULL), the trace values

                // are not broadcast.

                //

                MQTTAsync_setTraceCallback (&mqTraceCallback);

                // The default is minimum.

                MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);

               

 

 

                return TRUE;

}

 

 

void

mqCoreLink_stop(void)

{

                MQTTAsync_destroy(&client_);

}

 

 

BOOL

mqCoreLink_sendMessage(char* topic, char* msg, int msgLen)

{

                int rc;    

                BOOL ret = FALSE;

                MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

                MQTTAsync_message pubmsg = MQTTAsync_message_initializer;

                                 

                opts._onSuccess_ = onSend;

                opts.context = client_;

                pubmsg.payload = msg;

                pubmsg.payloadlen = msgLen;

                pubmsg.qos = QOS;

                pubmsg.retained = 0;

                deliveredtoken = 0;

 

                rc = MQTTAsync_sendMessage(client_, topic, &pubmsg, &opts);

                if ( rc != MQTTASYNC_SUCCESS)

                {

                                printf("Failed to start sendMessage, \n"

                                                "\t%s\n"

                                                "\treturn code %d\n", topic, rc);

                                goto quit;

                }             

               

               

                ret = TRUE;

               

quit:

                return ret;

}

 

 

 

void

connlost(void *context, char *cause)

{

                MQTTAsync client = (MQTTAsync)context;

                MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

                int rc;

               

                printf("\nConnection lost\n");

                printf("     cause: %s\n", cause);

                printf("Reconnecting\n");

               

                isConnected_ = FALSE;

               

                conn_opts.keepAliveInterval = 20;

                conn_opts.cleansession = 1;

                if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)

                {

                                                printf("Failed to start connect, return code %d\n", rc);

                }

}

 

 

void

onDisconnect(void* context, MQTTAsync_successData* response)

{

                printf("Successful disconnection\n");

                isConnected_ = FALSE;

}

 

 

void

onSend(void* context, MQTTAsync_successData* response)

{

                // printf("Message with token value %d delivery confirmed\n", response->token);

}

 

 

////////////////////////////////////////////////////////////

// Subscription callbacks.

////////////////////////////////////////////////////////////

 

void

onSubscribe(void* context, MQTTAsync_successData* response)

{

                printf("Subscribe succeeded\n");

}

 

 

void

onSubscribeFailure(void* context, MQTTAsync_failureData* response)

{

                printf("Subscribe failed, rc %d\n", response ? response->code : 0);

}

 

 

////////////////////////////////////////////////////////////

// Connection Callbacks

////////////////////////////////////////////////////////////

 

 

void

onConnectFailure(void* context, MQTTAsync_failureData* response)

{

                printf("Connect failed, rc %d\n", response ? response->code : 0);

                isConnected_ = FALSE;

}

 

 

 

//

// Called back from the paho mqtt library upon successful connection

// to the MQTT broker. We subscribe all registered topic names at this

// point.

//

void

onConnect(void* context, MQTTAsync_successData* response)

{

int topicCount;

                int tmpCount;

               

                MQTTAsync client = (MQTTAsync)context;

                MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

                int rc;

                int i;

               

                CoreLinkEvent* cle_elt;               

               

                isConnected_ = TRUE;

               

                topicCount = 0;

                DL_FOREACH(coreLinkEvents_head_,cle_elt)

                {

                                if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )

                                                topicCount += 1;

                }

               

                topicNames_ = alloca(sizeof *topicNames_ * topicCount);

                qos_ = alloca( (topicCount + 1) * sizeof(int) );

 

                for (i = 0; i < topicCount; ++i)

                {

                                topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);

                }

               

 

                //

                // Copy in all registered topic names

                //

                tmpCount = 0;

                DL_FOREACH(coreLinkEvents_head_,cle_elt)

                {

                                if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )

                                {

                                                strncpy(

                                                                topicNames_[tmpCount],

                                                                cle_elt->topic,

                                                                CORELINK_MAXTOPICLEN );

                                                tmpCount += 1;

                                }

                }

               

                               

                //

                // Make a quality of service array

                //

                for (i=0;i<topicCount;++i)

                {

                                qos_[i] = 1;

                                printf("subscribe: %s\n", &(*topicNames_[i]) );

                }

               

 

                //

                // Subscribe to all topics

                //

 

                printf("Successful connection\n");

                printf("Subscribing to many topics..\n");

               

                opts._onSuccess_ = onSubscribe;

                opts._onFailure_ = onSubscribeFailure;

                opts.context = client;

 

                rc = MQTTAsync_subscribeMany(

                                client_,

                                topicCount,

                                topicNames_,

                                &(qos_[0]),

                                &opts);

                               

                if ( rc != MQTTASYNC_SUCCESS )

                {

                                printf("Failed to subscribe to topics! %d\n", rc);

                }

}

 

 

 

//////////////////////////////////////////////////////////////////////////

// Handle receiving messages from the MQTT broker

// Generally, this means that a user interface running in a web-browser

// has published to a topic listed in the cmdTopics[] array.

//////////////////////////////////////////////////////////////////////////

 

int

messageArrived(

                void *context, char *topicName, int topicLen, MQTTAsync_message *message)

{

char* payloadptr;

                int i;

                               

               

printf("Message arrived\n");

printf("     topic: %s\n", topicName);

printf("   message: ");

payloadptr = message->payload;

for(i=0; i<message->payloadlen; i++)

{

putchar(*payloadptr++);

}

putchar('\n');

               

 

 

MQTTAsync_freeMessage(&message);

MQTTAsync_free(topicName);

return 1;

}

 


Back to the top