Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Paho » Paho client using asynchronous API(Paho client using asynchronous API only works in debug mode.)
Paho client using asynchronous API [message #1767941] Wed, 12 July 2017 09:52
Saleh Mohd is currently offline Saleh MohdFriend
Messages: 13
Registered: October 2015
Junior Member
Hi everyone,

I am writing an MQTT client with Paho asynchronous api. I am adapting this code here, and using ActiveMQ 5.15.0 as my MQTT server. The problem I am having is that, the program seems to work only in debug mode. That is, when I put a trace point and debug it, I can see both publisher and subscriber sending and receiving messages respectively. But when I run it normally, It just blocks straight away. Below are my three classes and program output in both debug and run mode.

A class that implements IMqttActionListener:
public class MessageActionListener implements IMqttActionListener {
	protected String messageText;
	protected String topic;
	protected String userContext;

	public MessageActionListener(String topic, String messageText, String userContext) {
		this.topic = topic;
		this.messageText = messageText;
		this.userContext = userContext;
	}

	@Override
	public void onSuccess(IMqttToken asyncActionToken) {
		if ((asyncActionToken != null) && asyncActionToken.getUserContext().equals(userContext)) {
			System.out.println(String.format("Message '%s' published to topic '%s'", messageText, topic));
		}
	}

	@Override
	public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
		exception.printStackTrace();
	}
}


A class that implements MqttCallback and IMqttActionListener:
public class MqttMessageClient implements MqttCallback, IMqttActionListener {
	protected String clientId;
	protected String broker;
	protected String publishTopic;
	protected String subscribeTopic;
	protected MqttAsyncClient client;
	protected MemoryPersistence memoryPersistence;
	protected IMqttToken connectToken;
	protected IMqttToken subscribeToken;
	protected static final int QoS = 0;
	public static final String ENCODING = "UTF-8";

	public MqttMessageClient(String broker, String publishTopic, String subscribeTopic) {
		this.broker = broker;
		this.publishTopic = publishTopic;
		this.subscribeTopic = subscribeTopic;
		clientId = MqttAsyncClient.generateClientId();
	}

	public void establishConnection() {
		try {
			MqttConnectOptions options = new MqttConnectOptions();
			memoryPersistence = new MemoryPersistence();
			clientId = MqttAsyncClient.generateClientId();
			client = new MqttAsyncClient(broker, clientId, memoryPersistence);
			// use this instance as the callback
			client.setCallback(this);
			connectToken = client.connect(options, null, this);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	public boolean isConnected() {
		return (client != null) && (client.isConnected());
	}

	@Override
	public void onSuccess(IMqttToken asyncActionToken) {
		if (asyncActionToken.equals(connectToken)) {
			System.out.println(String.format("Successfully connected to %s", broker));
			try {
				subscribeToken = client.subscribe(subscribeTopic, QoS, null, this);
			} catch (MqttException e) {
				e.printStackTrace();
			}
		} else if (asyncActionToken.equals(subscribeToken)) {
			System.out.println(String.format("%s subscribed to the %s topic", clientId, subscribeTopic));
		}

	}

	public MessageActionListener publishTextMessage(String messageText) {
		byte[] bytesMessage;
		try {
			bytesMessage = messageText.getBytes(ENCODING);
			MqttMessage message;
			message = new MqttMessage(bytesMessage);
			message.setRetained(true);
			String userContext = "ListeningMessage";
			MessageActionListener actionListener = new MessageActionListener(publishTopic, messageText, userContext);
			if (isConnected())
				client.publish(publishTopic, message, userContext, actionListener);
			return actionListener;
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
			return null;
		} catch (MqttException e) {
			e.printStackTrace();
			return null;
		}
	}

	@Override
	public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
		exception.printStackTrace();
	}

	@Override
	public void connectionLost(Throwable cause) {
		cause.printStackTrace();
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		if (!topic.equals(subscribeTopic)) {
			return;
		}

		String messageText = new String(message.getPayload(), ENCODING);
		System.out.println(String.format("received message from topic %s: %s", topic, messageText));

	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		// Delivery for a message has been completed
	    // and all acknowledgements have been received
	}

	public String getClientId() {
		return clientId;
	}

}


Main class:
public class Main {
	public static void main(String[] args) {
		String broker = "tcp://100.444.100.444:1883";
		String topic = "test/topic";

		MqttMessageClient mqttClient = new MqttMessageClient(broker, topic, topic);
		mqttClient.establishConnection();
		int count = 0;
		while (count < 5) {
			mqttClient.publishTextMessage("test message " + count);
			count++;
		}		
	}
}


Program output in normal "run" mode.
Successfully connected to tcp://10.4.194.9:1883
paho737188559278759 subscribed to the test/topic topic


Program output in "debug" mode.
paho737383850092296 subscribed to the test/topic topic
Message 'test message 0' published to topic 'test/topic'
received message from topic test/topic: test message 0
Message 'test message 1' published to topic 'test/topic'
received message from topic test/topic: test message 1
Message 'test message 2' published to topic 'test/topic'
received message from topic test/topic: test message 2
Message 'test message 3' published to topic 'test/topic'
received message from topic test/topic: test message 3
Message 'test message 4' published to topic 'test/topic'
received message from topic test/topic: test message 4


Any help on what or where I am doing it wrong will be highly appreciated.

Regards,
Saleh.
Previous Topic:Export Paho Project into an executeable jar-file
Next Topic:Unable to push MQTT message for a Topic
Goto Forum:
  


Current Time: Wed Apr 24 18:04:34 GMT 2024

Powered by FUDForum. Page generated in 0.02759 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top