Home » Eclipse Projects » Paho » The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project(The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project)
The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project [message #1726890] |
Thu, 17 March 2016 09:56 |
Zue Hidden Messages: 3 Registered: November 2015 |
Junior Member |
|
|
I develop a console application in Java by using Mqtt protocol and messaging with the server/broker (Mosquitto). I use Eclipse as an editor and use Eclipse Paho Project in MQTT implementations.
1) Publish method is exemplified below;
public void publish(String topicName, int qos, byte[] payload, boolean retained) throws Throwable {
// Use a state machine to decide which step to do next. State change occurs
// when a notification is received that an MQTT action has completed
state = BEGIN;
while (state != FINISH) {
switch (state) {
case BEGIN:
// Connect using a non-blocking connect
if (client.isConnected()) {
donext = true;
state = CONNECTED;
}
break;
case CONNECTED:
// Publish using a non-blocking publisher
Publisher pub = new Publisher();
pub.doPublish(topicName, qos, payload, retained);
break;
case ERROR:
log("Error is received!");
throw ex;
case PUBLISHED:
state = FINISH;
donext = true;
break;
}
if (state != FINISH) {
// Wait(6 seconds) until notified about a state change and then perform next action.
waitForStateChange(6000);
}
}
}
2) Publisher class and doPublish method of this class are exemplified below;
/**
* Publish in a non-blocking way and then sit back and wait to be
* notified that the action has completed.
*/
public class Publisher {
public void doPublish(String topicName, int qos, byte[] payload, boolean retained) {
// Send / publish a message to the server
// Get a token and setup an asynchronous listener on the token which
// will be notified once the message has been delivered
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
String time = new Timestamp(System.currentTimeMillis()).toString();
log("Publishing at: "+time+ " to topic \""+topicName+"\" qos "+qos);
// Setup a listener object to be notified when the publish completes.
//
IMqttActionListener pubListener = new IMqttActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Publish Completed");
state = PUBLISHED;
carryOn();
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
ex = exception;
state = ERROR;
log ("Publish failed" +exception);
carryOn();
}
public void carryOn() {
synchronized (waiter) {
donext=true;
waiter.notifyAll();
}
}
};
try {
// Publish the message
client.publish(topicName, message, "Pub sample context", pubListener);
} catch (MqttException e) {
state = ERROR;
donext = true;
ex = e;
}
}
}
Process;
In publish method(1) above, the client is checked if it is connected or not.
- If it is connected, then Publisher object is created and doPublish method of it is called.
- In doPublish method, the message is published on this line;
client.publish(topicName, message, "Pub sample context", pubListener);
"publistener" in Publisher class is important here, because if the message is sent succesfully, it is expected that onSuccess method is called inside in this listener and it stops 'while loop' in publish method(1) that is given in detail at the top of question. This "publistener" is expected to work asynchronously. However, after the message is published in publish method(1), thread waits 6 seconds and publishes the message again because of the fact that onSuccess or any other method of "publistener" are not called.
But, if I interfere in code to stop 'while loop' in publish method(1), I realised that the thread in publish method(1) finishes its own work and is over, then onSuccess method of "publistener" begin running. I mean it looks like that "publistener" cannot work asynchronously.
I need that after the line - client.publish(..), if the message is sent successfully, onSuccess method of "publistener" should work asynchrounously. How can I do that?
EDIT: I realised that IMqttToken of listener cannot be completed as long as the process in publish method(1) is not stopped. It looks like "publistener" cannot work asynchronously, although it should. But, how?
[Updated on: Thu, 17 March 2016 15:09] Report message to a moderator
|
|
| |
Re: The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project [message #1727024 is a reply to message #1727021] |
Fri, 18 March 2016 10:34 |
Zue Hidden Messages: 3 Registered: November 2015 |
Junior Member |
|
|
Thank you for your reply. I solved it in this morning. Actually, this publish process begins, after a message is arrived from the broker. But, in this process, this publistener cannot make onSuccess method begin, although I verify that the message is published successfully.
I tried that in messageArrived callback, I created new thread and I published new message in runnable method of this thread. Then, publistener began working properly!
I don't know but publistener might be in deadlock in previous situtation. However, it works properly now. Thank you again.
[Updated on: Mon, 21 March 2016 13:37] Report message to a moderator
|
|
|
Goto Forum:
Current Time: Fri Apr 26 01:16:20 GMT 2024
Powered by FUDForum. Page generated in 0.03317 seconds
|