Hello,
I’ve integrated the paho.c library
into one of my programs on Ubuntu 10.04LTS kernel 2.6
with the preempt patches, using it to connect to a
mosquito broker running on the same machine. I am
using the asynchronous API. To build the paho.c
library, I cloned the git repository and built from
source. I use the library to both subscribe and
publish on topics with the broker.
The paho library program frequently
crashes with the error:
pthread_mutex_lock.c
: 62 : __pthread_mutex_lock: Assertion `mutex-
__data.__owner = 0` Failed
Does anyone have any experience
with this error? My program has no threads or mutexes,
so the error must be coming from Paho. I’ve tried
various solutions to no avail. Sometime the program
runs for 6 shours, sometimes 20 seconds. The relevant
source code is below, which largely replicates the
async examples.
Any insight is appreciated. Thank
you in advance.
Tom
#include <MQTTAsync.h>
#define ADDRESS
"tcp://localhost:1883"
#define CLIENTID "MyClientId"
#define QOS 1
#define TIMEOUT 10000L
static MQTTAsync client_;
static BOOL isConnected_ = FALSE;
void
mqTraceCallback(enum
MQTTASYNC_TRACE_LEVELS level, char *message)
{
printf("TRACE:
%s\n", message);
}
//
// Start the client connection to
the MQTT broker.
//
BOOL
mqCoreLink_start(void)
{
int rc;
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
MQTTAsync_create(
&client_,
ADDRESS,
CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE,
NULL );
MQTTAsync_setCallbacks(
client_, // client handle
NULL, // context (will be passed to
every callback function)
connlost, // pointer to connection lost
callback
messageArrived, //
pointer to message-received callback
NULL ); // pointer to
delivery-complete callback
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts._onSuccess_
= onConnect;
conn_opts._onFailure_
= onConnectFailure;
conn_opts.context =
client_;
rc =
MQTTAsync_connect(client_, &conn_opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n",
rc);
return FALSE;
}
//
// Create the
default command interface.
//
mqCoreLink_registerSignal("_command", NULL);
//
// Enable tracing
// If no callback
is set (NULL), the trace values
// are not
broadcast.
//
MQTTAsync_setTraceCallback (&mqTraceCallback);
// The default is
minimum.
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
return TRUE;
}
void
mqCoreLink_stop(void)
{
MQTTAsync_destroy(&client_);
}
BOOL
mqCoreLink_sendMessage(char* topic,
char* msg, int msgLen)
{
int rc;
BOOL ret = FALSE;
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
MQTTAsync_message
pubmsg = MQTTAsync_message_initializer;
opts._onSuccess_ =
onSend;
opts.context =
client_;
pubmsg.payload =
msg;
pubmsg.payloadlen =
msgLen;
pubmsg.qos = QOS;
pubmsg.retained =
0;
deliveredtoken = 0;
rc =
MQTTAsync_sendMessage(client_, topic, &pubmsg,
&opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, \n"
"\t%s\n"
"\treturn code %d\n", topic, rc);
goto quit;
}
ret = TRUE;
quit:
return ret;
}
void
connlost(void *context, char
*cause)
{
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause:
%s\n", cause);
printf("Reconnecting\n");
isConnected_ =
FALSE;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc =
MQTTAsync_connect(client, &conn_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n",
rc);
}
}
void
onDisconnect(void* context,
MQTTAsync_successData* response)
{
printf("Successful
disconnection\n");
isConnected_ =
FALSE;
}
void
onSend(void* context,
MQTTAsync_successData* response)
{
// printf("Message
with token value %d delivery confirmed\n",
response->token);
}
////////////////////////////////////////////////////////////
// Subscription callbacks.
////////////////////////////////////////////////////////////
void
onSubscribe(void* context,
MQTTAsync_successData* response)
{
printf("Subscribe
succeeded\n");
}
void
onSubscribeFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Subscribe
failed, rc %d\n", response ? response->code : 0);
}
////////////////////////////////////////////////////////////
// Connection Callbacks
////////////////////////////////////////////////////////////
void
onConnectFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Connect
failed, rc %d\n", response ? response->code : 0);
isConnected_ =
FALSE;
}
//
// Called back from the paho mqtt
library upon successful connection
// to the MQTT broker. We subscribe
all registered topic names at this
// point.
//
void
onConnect(void* context,
MQTTAsync_successData* response)
{
int
topicCount;
int tmpCount;
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
int rc;
int i;
CoreLinkEvent*
cle_elt;
isConnected_ =
TRUE;
topicCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if
( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
topicCount += 1;
}
topicNames_ =
alloca(sizeof *topicNames_ * topicCount);
qos_ = alloca(
(topicCount + 1) * sizeof(int) );
for (i = 0; i <
topicCount; ++i)
{
topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);
}
//
// Copy in all
registered topic names
//
tmpCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if
( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
{
strncpy(
topicNames_[tmpCount],
cle_elt->topic,
CORELINK_MAXTOPICLEN );
tmpCount += 1;
}
}
//
// Make a quality
of service array
//
for
(i=0;i<topicCount;++i)
{
qos_[i] = 1;
printf("subscribe: %s\n", &(*topicNames_[i]) );
}
//
// Subscribe to all
topics
//
printf("Successful
connection\n");
printf("Subscribing
to many topics..\n");
opts._onSuccess_ =
onSubscribe;
opts._onFailure_ =
onSubscribeFailure;
opts.context =
client;
rc =
MQTTAsync_subscribeMany(
client_,
topicCount,
topicNames_,
&(qos_[0]),
&opts);
if ( rc !=
MQTTASYNC_SUCCESS )
{
printf("Failed to subscribe to topics! %d\n", rc);
}
}
//////////////////////////////////////////////////////////////////////////
// Handle receiving messages from
the MQTT broker
// Generally, this means that a
user interface running in a web-browser
// has published to a topic listed
in the cmdTopics[] array.
//////////////////////////////////////////////////////////////////////////
int
messageArrived(
void *context, char
*topicName, int topicLen, MQTTAsync_message *message)
{
char*
payloadptr;
int i;
printf("Message
arrived\n");
printf("
topic: %s\n", topicName);
printf("
message: ");
payloadptr
= message->payload;
for(i=0;
i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
…
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev