Hello,
I’ve integrated the paho.c library into one of my programs on Ubuntu 10.04LTS kernel 2.6 with the preempt patches, using it to connect to a mosquito broker running on the same machine. I am using the asynchronous API. To build the paho.c
library, I cloned the git repository and built from source. I use the library to both subscribe and publish on topics with the broker.
The paho library program frequently crashes with the error:
pthread_mutex_lock.c : 62 : __pthread_mutex_lock: Assertion `mutex- __data.__owner = 0` Failed
Does anyone have any experience with this error? My program has no threads or mutexes, so the error must be coming from Paho. I’ve tried various solutions to no avail. Sometime the program runs for 6 shours, sometimes 20 seconds. The relevant
source code is below, which largely replicates the async examples.
Any insight is appreciated. Thank you in advance.
Tom
#include <MQTTAsync.h>
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "MyClientId"
#define QOS 1
#define TIMEOUT 10000L
static MQTTAsync client_;
static BOOL isConnected_ = FALSE;
void
mqTraceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
{
printf("TRACE: %s\n", message);
}
//
// Start the client connection to the MQTT broker.
//
BOOL
mqCoreLink_start(void)
{
int rc;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_create(
&client_,
ADDRESS,
CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE,
NULL );
MQTTAsync_setCallbacks(
client_, // client handle
NULL, // context (will be passed to every callback function)
connlost, // pointer to connection lost callback
messageArrived, // pointer to message-received callback
NULL ); // pointer to delivery-complete callback
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts._onSuccess_ = onConnect;
conn_opts._onFailure_ = onConnectFailure;
conn_opts.context = client_;
rc = MQTTAsync_connect(client_, &conn_opts);
if ( rc != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return FALSE;
}
//
// Create the default command interface.
//
mqCoreLink_registerSignal("_command", NULL);
//
// Enable tracing
// If no callback is set (NULL), the trace values
// are not broadcast.
//
MQTTAsync_setTraceCallback (&mqTraceCallback);
// The default is minimum.
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
return TRUE;
}
void
mqCoreLink_stop(void)
{
MQTTAsync_destroy(&client_);
}
BOOL
mqCoreLink_sendMessage(char* topic, char* msg, int msgLen)
{
int rc;
BOOL ret = FALSE;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
opts._onSuccess_ = onSend;
opts.context = client_;
pubmsg.payload = msg;
pubmsg.payloadlen = msgLen;
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
rc = MQTTAsync_sendMessage(client_, topic, &pubmsg, &opts);
if ( rc != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, \n"
"\t%s\n"
"\treturn code %d\n", topic, rc);
goto quit;
}
ret = TRUE;
quit:
return ret;
}
void
connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
isConnected_ = FALSE;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
}
}
void
onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
isConnected_ = FALSE;
}
void
onSend(void* context, MQTTAsync_successData* response)
{
// printf("Message with token value %d delivery confirmed\n", response->token);
}
////////////////////////////////////////////////////////////
// Subscription callbacks.
////////////////////////////////////////////////////////////
void
onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
}
void
onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
}
////////////////////////////////////////////////////////////
// Connection Callbacks
////////////////////////////////////////////////////////////
void
onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
isConnected_ = FALSE;
}
//
// Called back from the paho mqtt library upon successful connection
// to the MQTT broker. We subscribe all registered topic names at this
// point.
//
void
onConnect(void* context, MQTTAsync_successData* response)
{
int topicCount;
int tmpCount;
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int i;
CoreLinkEvent* cle_elt;
isConnected_ = TRUE;
topicCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
topicCount += 1;
}
topicNames_ = alloca(sizeof *topicNames_ * topicCount);
qos_ = alloca( (topicCount + 1) * sizeof(int) );
for (i = 0; i < topicCount; ++i)
{
topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);
}
//
// Copy in all registered topic names
//
tmpCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
{
strncpy(
topicNames_[tmpCount],
cle_elt->topic,
CORELINK_MAXTOPICLEN );
tmpCount += 1;
}
}
//
// Make a quality of service array
//
for (i=0;i<topicCount;++i)
{
qos_[i] = 1;
printf("subscribe: %s\n", &(*topicNames_[i]) );
}
//
// Subscribe to all topics
//
printf("Successful connection\n");
printf("Subscribing to many topics..\n");
opts._onSuccess_ = onSubscribe;
opts._onFailure_ = onSubscribeFailure;
opts.context = client;
rc = MQTTAsync_subscribeMany(
client_,
topicCount,
topicNames_,
&(qos_[0]),
&opts);
if ( rc != MQTTASYNC_SUCCESS )
{
printf("Failed to subscribe to topics! %d\n", rc);
}
}
//////////////////////////////////////////////////////////////////////////
// Handle receiving messages from the MQTT broker
// Generally, this means that a user interface running in a web-browser
// has published to a topic listed in the cmdTopics[] array.
//////////////////////////////////////////////////////////////////////////
int
messageArrived(
void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
char* payloadptr;
int i;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
…
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev