Hello,
I’ve integrated the paho.c library into one
of my programs on Ubuntu 10.04LTS kernel 2.6 with the preempt
patches, using it to connect to a mosquito broker running on
the same machine. I am using the asynchronous API. To build
the paho.c library, I cloned the git repository and built from
source. I use the library to both subscribe and publish on
topics with the broker.
The paho library program frequently crashes
with the error:
pthread_mutex_lock.c
: 62 : __pthread_mutex_lock: Assertion `mutex- __data.__owner
= 0` Failed
Does anyone have any experience with this
error? My program has no threads or mutexes, so the error must
be coming from Paho. I’ve tried various solutions to no avail.
Sometime the program runs for 6 shours, sometimes 20 seconds.
The relevant source code is below, which largely replicates
the async examples.
Any insight is appreciated. Thank you in
advance.
Tom
#include <MQTTAsync.h>
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "MyClientId"
#define QOS 1
#define TIMEOUT 10000L
static MQTTAsync client_;
static BOOL isConnected_ = FALSE;
void
mqTraceCallback(enum MQTTASYNC_TRACE_LEVELS
level, char *message)
{
printf("TRACE:
%s\n", message);
}
//
// Start the client connection to the MQTT
broker.
//
BOOL
mqCoreLink_start(void)
{
int rc;
MQTTAsync_connectOptions
conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_create(
&client_,
ADDRESS,
CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE,
NULL );
MQTTAsync_setCallbacks(
client_,
// client handle
NULL,
// context (will be passed to every
callback function)
connlost,
// pointer to connection lost callback
messageArrived, // pointer to
message-received callback
NULL
); // pointer to delivery-complete callback
conn_opts.keepAliveInterval
= 20;
conn_opts.cleansession = 1;
conn_opts._onSuccess_ =
onConnect;
conn_opts._onFailure_ =
onConnectFailure;
conn_opts.context =
client_;
rc =
MQTTAsync_connect(client_, &conn_opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return
FALSE;
}
//
// Create the default
command interface.
//
mqCoreLink_registerSignal("_command", NULL);
//
// Enable tracing
// If no callback is set
(NULL), the trace values
// are not broadcast.
//
MQTTAsync_setTraceCallback
(&mqTraceCallback);
// The default is minimum.
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
return TRUE;
}
void
mqCoreLink_stop(void)
{
MQTTAsync_destroy(&client_);
}
BOOL
mqCoreLink_sendMessage(char* topic, char*
msg, int msgLen)
{
int rc;
BOOL ret = FALSE;
MQTTAsync_responseOptions
opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg =
MQTTAsync_message_initializer;
opts._onSuccess_ = onSend;
opts.context = client_;
pubmsg.payload = msg;
pubmsg.payloadlen = msgLen;
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
rc =
MQTTAsync_sendMessage(client_, topic, &pubmsg, &opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, \n"
"\t%s\n"
"\treturn code %d\n", topic, rc);
goto quit;
}
ret = TRUE;
quit:
return ret;
}
void
connlost(void *context, char *cause)
{
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_connectOptions
conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection
lost\n");
printf(" cause: %s\n",
cause);
printf("Reconnecting\n");
isConnected_ = FALSE;
conn_opts.keepAliveInterval
= 20;
conn_opts.cleansession = 1;
if ((rc =
MQTTAsync_connect(client, &conn_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
}
}
void
onDisconnect(void* context,
MQTTAsync_successData* response)
{
printf("Successful
disconnection\n");
isConnected_ = FALSE;
}
void
onSend(void* context,
MQTTAsync_successData* response)
{
// printf("Message with
token value %d delivery confirmed\n", response->token);
}
////////////////////////////////////////////////////////////
// Subscription callbacks.
////////////////////////////////////////////////////////////
void
onSubscribe(void* context,
MQTTAsync_successData* response)
{
printf("Subscribe
succeeded\n");
}
void
onSubscribeFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Subscribe failed,
rc %d\n", response ? response->code : 0);
}
////////////////////////////////////////////////////////////
// Connection Callbacks
////////////////////////////////////////////////////////////
void
onConnectFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Connect failed, rc
%d\n", response ? response->code : 0);
isConnected_ = FALSE;
}
//
// Called back from the paho mqtt library
upon successful connection
// to the MQTT broker. We subscribe all
registered topic names at this
// point.
//
void
onConnect(void* context,
MQTTAsync_successData* response)
{
int topicCount;
int tmpCount;
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_responseOptions
opts = MQTTAsync_responseOptions_initializer;
int rc;
int i;
CoreLinkEvent*
cle_elt;
isConnected_ = TRUE;
topicCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if (
cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
topicCount += 1;
}
topicNames_ = alloca(sizeof
*topicNames_ * topicCount);
qos_ = alloca( (topicCount
+ 1) * sizeof(int) );
for (i = 0; i <
topicCount; ++i)
{
topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);
}
//
// Copy in all registered
topic names
//
tmpCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if (
cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
{
strncpy(
topicNames_[tmpCount],
cle_elt->topic,
CORELINK_MAXTOPICLEN );
tmpCount += 1;
}
}
//
// Make a quality of
service array
//
for
(i=0;i<topicCount;++i)
{
qos_[i] =
1;
printf("subscribe: %s\n", &(*topicNames_[i]) );
}
//
// Subscribe to all topics
//
printf("Successful
connection\n");
printf("Subscribing to many
topics..\n");
opts._onSuccess_ =
onSubscribe;
opts._onFailure_ =
onSubscribeFailure;
opts.context = client;
rc =
MQTTAsync_subscribeMany(
client_,
topicCount,
topicNames_,
&(qos_[0]),
&opts);
if ( rc !=
MQTTASYNC_SUCCESS )
{
printf("Failed to subscribe to topics! %d\n", rc);
}
}
//////////////////////////////////////////////////////////////////////////
// Handle receiving messages from the MQTT
broker
// Generally, this means that a user
interface running in a web-browser
// has published to a topic listed in the
cmdTopics[] array.
//////////////////////////////////////////////////////////////////////////
int
messageArrived(
void *context, char
*topicName, int topicLen, MQTTAsync_message *message)
{
char* payloadptr;
int i;
printf("Message
arrived\n");
printf("
topic: %s\n", topicName);
printf("
message: ");
payloadptr =
message->payload;
for(i=0;
i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
…
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}