Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [paho-dev] pthread_mutex_lock

Ian,

They sent me to Hong Kong to cover for a sales trip. I'm trying to test it at night; hopefully I'll get to it today or tomorrow. I'll be doing a live test with the EtherCAT fieldbus on Monday. 

Thanks for all your help with this. I'll keep you posted. 

Thomas Bitsky Jr
(Sent from my mobile device.)

On Sep 17, 2013, at 5:24 AM, "Ian Craggs" <icraggs@xxxxxxxxxxxxxxxxxxxxxxx> wrote:

Hi Tom,

any news?

Ian

On 12/09/13 18:00, Thomas Bitsky Jr wrote:

Interesting thought. I’ll give it a try later today and see if that helps.

 

Thanks!

Thomas C. Bitsky Jr. | Lead Developer

ADC | automateddesign.com

P: 630-783-1150 F: 630-783-1159 M: 630-632-6679

 

From: paho-dev-bounces@xxxxxxxxxxx [mailto:paho-dev-bounces@xxxxxxxxxxx] On Behalf Of Ian Craggs
Sent: Thursday, September 12, 2013 11:45 AM
To: paho-dev@xxxxxxxxxxx
Subject: Re: [paho-dev] pthread_mutex_lock

 

Tom,

do you think that the program might be getting disconnected from time to time, and that might be happening on a publish?

If so, I have a theory why that would cause this problem.  You could try replacing MQTTAsync.c and MQTTAsync.h with the attached, to see if they help.

Ian


On 12/09/13 17:19, Thomas Bitsky Jr wrote:

Ian,

 

I think it's happening randomly when I publish on a topic. I'll try and run a trace on it, as well. 

 

Thanks!

 

(Sent from my mobile device.)


On Sep 12, 2013, at 10:56 AM, "Ian Craggs" <icraggs@xxxxxxxxxxxxxxxxxxxxxxx> wrote:

Hi Tom,

I've seen this occasionally, but not had chance to track it down.    Do you have any idea at what point it's happening in your program? 

I'll try to track it down.  If it's possible for you to get a trace of the time just before it fails, that would be great (but I realize how hard that might be).  The best way would be to set an environment variable

export MQTT_C_CLIENT_TRACE=ON

and let it print to stdout.  You can have it write to a file, but if the program is crashing then the last few entries are likely to be lost.

Ian

On 12/09/13 16:24, Thomas Bitsky Jr wrote:

Hello,

 

I’ve integrated the paho.c library into one of my programs on Ubuntu 10.04LTS kernel 2.6 with the preempt patches, using it to connect to a mosquito broker running on the same machine. I am using the asynchronous API. To build the paho.c library, I cloned the git repository and built from source. I use the library to both subscribe and publish on topics with the broker.

 

The paho library program frequently crashes with the error:

pthread_mutex_lock.c : 62 : __pthread_mutex_lock: Assertion `mutex- __data.__owner = 0` Failed

 

Does anyone have any experience with this error? My program has no threads or mutexes, so the error must be coming from Paho. I’ve tried various solutions to no avail. Sometime the program runs for 6 shours, sometimes 20 seconds. The relevant source code is below, which largely replicates the async examples.

 

Any insight is appreciated. Thank you in advance.

 

Tom

 

#include <MQTTAsync.h>

 

#define ADDRESS     "tcp://localhost:1883"

#define CLIENTID    "MyClientId"

#define QOS         1

#define TIMEOUT     10000L

 

static MQTTAsync client_;

static BOOL isConnected_ = FALSE;

 

 

 

void

mqTraceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)

{

printf("TRACE: %s\n", message);

}

 

//

// Start the client connection to the MQTT broker.

//

BOOL

mqCoreLink_start(void)

{

                int rc;

                MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

               

                MQTTAsync_create(

                                &client_,

                                ADDRESS,

                                CLIENTID,

                                MQTTCLIENT_PERSISTENCE_NONE,

                                NULL );

                               

                MQTTAsync_setCallbacks(

                                client_,                 // client handle

                                NULL,                    // context (will be passed to every callback function)

                                connlost,             // pointer to connection lost callback

                                messageArrived,                              // pointer to message-received callback

                                NULL );                 // pointer to delivery-complete callback

                               

                conn_opts.keepAliveInterval = 20;

                conn_opts.cleansession = 1;

                conn_opts._onSuccess_ = onConnect;

                conn_opts._onFailure_ = onConnectFailure;

                conn_opts.context = client_;

               

                rc = MQTTAsync_connect(client_, &conn_opts);

                if ( rc != MQTTASYNC_SUCCESS)

                {

                                printf("Failed to start connect, return code %d\n", rc);

                                return FALSE;     

                }

               

                //

                // Create the default command interface.

                //

                mqCoreLink_registerSignal("_command", NULL);

 

 

 

                //

                // Enable tracing

                // If no callback is set (NULL), the trace values

                // are not broadcast.

                //

                MQTTAsync_setTraceCallback (&mqTraceCallback);

                // The default is minimum.

                MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);

               

 

 

                return TRUE;

}

 

 

void

mqCoreLink_stop(void)

{

                MQTTAsync_destroy(&client_);

}

 

 

BOOL

mqCoreLink_sendMessage(char* topic, char* msg, int msgLen)

{

                int rc;    

                BOOL ret = FALSE;

                MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

                MQTTAsync_message pubmsg = MQTTAsync_message_initializer;

                                 

                opts._onSuccess_ = onSend;

                opts.context = client_;

                pubmsg.payload = msg;

                pubmsg.payloadlen = msgLen;

                pubmsg.qos = QOS;

                pubmsg.retained = 0;

                deliveredtoken = 0;

 

                rc = MQTTAsync_sendMessage(client_, topic, &pubmsg, &opts);

                if ( rc != MQTTASYNC_SUCCESS)

                {

                                printf("Failed to start sendMessage, \n"

                                                "\t%s\n"

                                                "\treturn code %d\n", topic, rc);

                                goto quit;

                }             

               

               

                ret = TRUE;

               

quit:

                return ret;

}

 

 

 

void

connlost(void *context, char *cause)

{

                MQTTAsync client = (MQTTAsync)context;

                MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

                int rc;

               

                printf("\nConnection lost\n");

                printf("     cause: %s\n", cause);

                printf("Reconnecting\n");

               

                isConnected_ = FALSE;

               

                conn_opts.keepAliveInterval = 20;

                conn_opts.cleansession = 1;

                if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)

                {

                                                printf("Failed to start connect, return code %d\n", rc);

                }

}

 

 

void

onDisconnect(void* context, MQTTAsync_successData* response)

{

                printf("Successful disconnection\n");

                isConnected_ = FALSE;

}

 

 

void

onSend(void* context, MQTTAsync_successData* response)

{

                // printf("Message with token value %d delivery confirmed\n", response->token);

}

 

 

////////////////////////////////////////////////////////////

// Subscription callbacks.

////////////////////////////////////////////////////////////

 

void

onSubscribe(void* context, MQTTAsync_successData* response)

{

                printf("Subscribe succeeded\n");

}

 

 

void

onSubscribeFailure(void* context, MQTTAsync_failureData* response)

{

                printf("Subscribe failed, rc %d\n", response ? response->code : 0);

}

 

 

////////////////////////////////////////////////////////////

// Connection Callbacks

////////////////////////////////////////////////////////////

 

 

void

onConnectFailure(void* context, MQTTAsync_failureData* response)

{

                printf("Connect failed, rc %d\n", response ? response->code : 0);

                isConnected_ = FALSE;

}

 

 

 

//

// Called back from the paho mqtt library upon successful connection

// to the MQTT broker. We subscribe all registered topic names at this

// point.

//

void

onConnect(void* context, MQTTAsync_successData* response)

{

int topicCount;

                int tmpCount;

               

                MQTTAsync client = (MQTTAsync)context;

                MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

                int rc;

                int i;

               

                CoreLinkEvent* cle_elt;               

               

                isConnected_ = TRUE;

               

                topicCount = 0;

                DL_FOREACH(coreLinkEvents_head_,cle_elt)

                {

                                if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )

                                                topicCount += 1;

                }

               

                topicNames_ = alloca(sizeof *topicNames_ * topicCount);

                qos_ = alloca( (topicCount + 1) * sizeof(int) );

 

                for (i = 0; i < topicCount; ++i)

                {

                                topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);

                }

               

 

                //

                // Copy in all registered topic names

                //

                tmpCount = 0;

                DL_FOREACH(coreLinkEvents_head_,cle_elt)

                {

                                if ( cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )

                                {

                                                strncpy(

                                                                topicNames_[tmpCount],

                                                                cle_elt->topic,

                                                                CORELINK_MAXTOPICLEN );

                                                tmpCount += 1;

                                }

                }

               

                               

                //

                // Make a quality of service array

                //

                for (i=0;i<topicCount;++i)

                {

                                qos_[i] = 1;

                                printf("subscribe: %s\n", &(*topicNames_[i]) );

                }

               

 

                //

                // Subscribe to all topics

                //

 

                printf("Successful connection\n");

                printf("Subscribing to many topics..\n");

               

                opts._onSuccess_ = onSubscribe;

                opts._onFailure_ = onSubscribeFailure;

                opts.context = client;

 

                rc = MQTTAsync_subscribeMany(

                                client_,

                                topicCount,

                                topicNames_,

                                &(qos_[0]),

                                &opts);

                               

                if ( rc != MQTTASYNC_SUCCESS )

                {

                                printf("Failed to subscribe to topics! %d\n", rc);

                }

}

 

 

 

//////////////////////////////////////////////////////////////////////////

// Handle receiving messages from the MQTT broker

// Generally, this means that a user interface running in a web-browser

// has published to a topic listed in the cmdTopics[] array.

//////////////////////////////////////////////////////////////////////////

 

int

messageArrived(

                void *context, char *topicName, int topicLen, MQTTAsync_message *message)

{

char* payloadptr;

                int i;

                               

               

printf("Message arrived\n");

printf("     topic: %s\n", topicName);

printf("   message: ");

payloadptr = message->payload;

for(i=0; i<message->payloadlen; i++)

{

putchar(*payloadptr++);

}

putchar('\n');

               

 

 

MQTTAsync_freeMessage(&message);

MQTTAsync_free(topicName);

return 1;

}

 




_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev

 

_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev




_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev

 



_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev


Back to the top