Hi everyone,
I am writing an MQTT client with Paho asynchronous api. I am adapting this code here, and using ActiveMQ 5.15.0 as my MQTT server. The problem I am having is that, the program seems to work only in debug mode. That is, when I put a trace point and debug it, I can see both publisher and subscriber sending and receiving messages respectively. But when I run it normally, It just blocks straight away. Below are my three classes and program output in both debug and run mode.
A class that implements IMqttActionListener:
public class MessageActionListener implements IMqttActionListener {
protected String messageText;
protected String topic;
protected String userContext;
public MessageActionListener(String topic, String messageText, String userContext) {
this.topic = topic;
this.messageText = messageText;
this.userContext = userContext;
}
@Override
public void onSuccess(IMqttToken asyncActionToken) {
if ((asyncActionToken != null) && asyncActionToken.getUserContext().equals(userContext)) {
System.out.println(String.format("Message '%s' published to topic '%s'", messageText, topic));
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
}
A class that implements MqttCallback and IMqttActionListener:
public class MqttMessageClient implements MqttCallback, IMqttActionListener {
protected String clientId;
protected String broker;
protected String publishTopic;
protected String subscribeTopic;
protected MqttAsyncClient client;
protected MemoryPersistence memoryPersistence;
protected IMqttToken connectToken;
protected IMqttToken subscribeToken;
protected static final int QoS = 0;
public static final String ENCODING = "UTF-8";
public MqttMessageClient(String broker, String publishTopic, String subscribeTopic) {
this.broker = broker;
this.publishTopic = publishTopic;
this.subscribeTopic = subscribeTopic;
clientId = MqttAsyncClient.generateClientId();
}
public void establishConnection() {
try {
MqttConnectOptions options = new MqttConnectOptions();
memoryPersistence = new MemoryPersistence();
clientId = MqttAsyncClient.generateClientId();
client = new MqttAsyncClient(broker, clientId, memoryPersistence);
// use this instance as the callback
client.setCallback(this);
connectToken = client.connect(options, null, this);
} catch (MqttException e) {
e.printStackTrace();
}
}
public boolean isConnected() {
return (client != null) && (client.isConnected());
}
@Override
public void onSuccess(IMqttToken asyncActionToken) {
if (asyncActionToken.equals(connectToken)) {
System.out.println(String.format("Successfully connected to %s", broker));
try {
subscribeToken = client.subscribe(subscribeTopic, QoS, null, this);
} catch (MqttException e) {
e.printStackTrace();
}
} else if (asyncActionToken.equals(subscribeToken)) {
System.out.println(String.format("%s subscribed to the %s topic", clientId, subscribeTopic));
}
}
public MessageActionListener publishTextMessage(String messageText) {
byte[] bytesMessage;
try {
bytesMessage = messageText.getBytes(ENCODING);
MqttMessage message;
message = new MqttMessage(bytesMessage);
message.setRetained(true);
String userContext = "ListeningMessage";
MessageActionListener actionListener = new MessageActionListener(publishTopic, messageText, userContext);
if (isConnected())
client.publish(publishTopic, message, userContext, actionListener);
return actionListener;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
} catch (MqttException e) {
e.printStackTrace();
return null;
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
@Override
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (!topic.equals(subscribeTopic)) {
return;
}
String messageText = new String(message.getPayload(), ENCODING);
System.out.println(String.format("received message from topic %s: %s", topic, messageText));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Delivery for a message has been completed
// and all acknowledgements have been received
}
public String getClientId() {
return clientId;
}
}
Main class:
public class Main {
public static void main(String[] args) {
String broker = "tcp://100.444.100.444:1883";
String topic = "test/topic";
MqttMessageClient mqttClient = new MqttMessageClient(broker, topic, topic);
mqttClient.establishConnection();
int count = 0;
while (count < 5) {
mqttClient.publishTextMessage("test message " + count);
count++;
}
}
}
Program output in normal "run" mode.
Successfully connected to tcp://10.4.194.9:1883
paho737188559278759 subscribed to the test/topic topic
Program output in "debug" mode.
paho737383850092296 subscribed to the test/topic topic
Message 'test message 0' published to topic 'test/topic'
received message from topic test/topic: test message 0
Message 'test message 1' published to topic 'test/topic'
received message from topic test/topic: test message 1
Message 'test message 2' published to topic 'test/topic'
received message from topic test/topic: test message 2
Message 'test message 3' published to topic 'test/topic'
received message from topic test/topic: test message 3
Message 'test message 4' published to topic 'test/topic'
received message from topic test/topic: test message 4
Any help on what or where I am doing it wrong will be highly appreciated.
Regards,
Saleh.