Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Paho » The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project(The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project)
The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project [message #1726890] Thu, 17 March 2016 09:56 Go to next message
Zue Hidden is currently offline Zue HiddenFriend
Messages: 3
Registered: November 2015
Junior Member
I develop a console application in Java by using Mqtt protocol and messaging with the server/broker (Mosquitto). I use Eclipse as an editor and use Eclipse Paho Project in MQTT implementations.

1) Publish method is exemplified below;

public void publish(String topicName, int qos, byte[] payload, boolean retained) throws Throwable {
    // Use a state machine to decide which step to do next. State change occurs
    // when a notification is received that an MQTT action has completed

    state = BEGIN;

    while (state != FINISH) {
        switch (state) {
        case BEGIN:
            // Connect using a non-blocking connect
            if (client.isConnected()) {
                donext = true;
                state = CONNECTED;
            }
            break;
        case CONNECTED:
            // Publish using a non-blocking publisher
            Publisher pub = new Publisher();
            pub.doPublish(topicName, qos, payload, retained);
            break;
        case ERROR:
            log("Error is received!");
            throw ex;
        case PUBLISHED:
            state = FINISH;
            donext = true;
            break;
        }

        if (state != FINISH) {
            // Wait(6 seconds) until notified about a state change and then perform next action.
            waitForStateChange(6000);
        }
    }
}


2) Publisher class and doPublish method of this class are exemplified below;

/**
 * Publish in a non-blocking way and then sit back and wait to be
 * notified that the action has completed.
 */
public class Publisher {
    public void doPublish(String topicName, int qos, byte[] payload, boolean retained) {
        // Send / publish a message to the server
        // Get a token and setup an asynchronous listener on the token which
        // will be notified once the message has been delivered
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        message.setRetained(retained);

        String time = new Timestamp(System.currentTimeMillis()).toString();
        log("Publishing at: "+time+ " to topic \""+topicName+"\" qos "+qos);

        // Setup a listener object to be notified when the publish completes.
        //
        IMqttActionListener pubListener = new IMqttActionListener() {
            public void onSuccess(IMqttToken asyncActionToken) {
                log("Publish Completed");
                state = PUBLISHED;
                carryOn();
            }

            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                ex = exception;
                state = ERROR;
                log ("Publish failed" +exception);
                carryOn();
            }

            public void carryOn() {
                synchronized (waiter) {
                    donext=true;
                    waiter.notifyAll();
                }
            }
        };

        try {
            // Publish the message
            client.publish(topicName, message, "Pub sample context", pubListener);
        } catch (MqttException e) {
            state = ERROR;
            donext = true;
            ex = e;
        }
    }
}


Process;

In publish method(1) above, the client is checked if it is connected or not.

- If it is connected, then Publisher object is created and doPublish method of it is called.

- In doPublish method, the message is published on this line;

client.publish(topicName, message, "Pub sample context", pubListener);


"publistener" in Publisher class is important here, because if the message is sent succesfully, it is expected that onSuccess method is called inside in this listener and it stops 'while loop' in publish method(1) that is given in detail at the top of question. This "publistener" is expected to work asynchronously. However, after the message is published in publish method(1), thread waits 6 seconds and publishes the message again because of the fact that onSuccess or any other method of "publistener" are not called.

But, if I interfere in code to stop 'while loop' in publish method(1), I realised that the thread in publish method(1) finishes its own work and is over, then onSuccess method of "publistener" begin running. I mean it looks like that "publistener" cannot work asynchronously.

I need that after the line - client.publish(..), if the message is sent successfully, onSuccess method of "publistener" should work asynchrounously. How can I do that?

EDIT: I realised that IMqttToken of listener cannot be completed as long as the process in publish method(1) is not stopped. It looks like "publistener" cannot work asynchronously, although it should. But, how?

[Updated on: Thu, 17 March 2016 15:09]

Report message to a moderator

Re: The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project [message #1727021 is a reply to message #1726890] Fri, 18 March 2016 10:17 Go to previous messageGo to next message
James Sutton is currently offline James SuttonFriend
Messages: 71
Registered: July 2015
Member
Hi,

It looks like you might want to re-think how your application is designed, I don't believe that you will be able to reference the state enum as it's inside a Local Context. The small example below should achieve what you are describing:

package sample;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PublisherExample {
	
	private boolean pubComplete;
	private boolean pubAttempted;
	static MqttAsyncClient asyncClient;
	String topic        = "/acTest";
	int qos 			= 0;
    String broker       = "tcp://localhost:1883";
    String clientId     = "JavaSample";
	
	public PublisherExample(){
		try {
			asyncClient = new MqttAsyncClient(broker, clientId);
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
			
		PublisherExample publisherExample = new PublisherExample();
		publisherExample.publishMessage();
			
	}
	
	
	public void publishMessage() {
		try {
		IMqttActionListener pubListener = new IMqttActionListener() {
			
			@Override
			public void onSuccess(IMqttToken asyncActionToken) {
				setPubComplete(true);
				System.out.println("Publish Success!");
				
			}
			
			@Override
			public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
				setPubAttempted(false);
				System.out.println("Publish Failure!");
				
			}
		};
    	
	    	while(!pubComplete){
	    		if(!asyncClient.isConnected()){
	    			// Client is not connected
		    		asyncClient.connect();
		    	} else {
		    		while(!pubAttempted){
			    		// Client is connected
			    		MqttMessage msg = new MqttMessage();
			    		msg.setPayload("Hello World!".getBytes());
			    		msg.setQos(0);
			    		System.out.println("Attempting Publish");
			    		asyncClient.publish("/test", msg, null, pubListener);
			    		setPubAttempted(true);
		    		}
		    	}
	    		Thread.sleep(1000);
	    	}
    	System.out.println("Publish Complete!");
    	} catch (Exception ex){
    		System.err.println("Inturrupted");
    		ex.printStackTrace();
    	}
	}
	
	public void setPubComplete(boolean pubComplete){
		this.pubComplete = pubComplete;
	}
	
	public void setPubAttempted(boolean pubAttempted){
		this.pubAttempted = pubAttempted;
	}

}



Re: The listener of publish method does not work asynchronously on Mqtt in Eclipse Paho Project [message #1727024 is a reply to message #1727021] Fri, 18 March 2016 10:34 Go to previous message
Zue Hidden is currently offline Zue HiddenFriend
Messages: 3
Registered: November 2015
Junior Member
Thank you for your reply. I solved it in this morning. Actually, this publish process begins, after a message is arrived from the broker. But, in this process, this publistener cannot make onSuccess method begin, although I verify that the message is published successfully.

I tried that in messageArrived callback, I created new thread and I published new message in runnable method of this thread. Then, publistener began working properly!

I don't know but publistener might be in deadlock in previous situtation. However, it works properly now. Thank you again.

[Updated on: Mon, 21 March 2016 13:37]

Report message to a moderator

Previous Topic:Upcoming Virtual IoT meetup about mqtt-spy
Next Topic:MQTT javascript websockets ssl client within android web browser
Goto Forum:
  


Current Time: Fri Apr 26 01:16:20 GMT 2024

Powered by FUDForum. Page generated in 0.03317 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top