Home » Eclipse Projects » Paho » Automatic reconnect in the Java client
| |
Re: Automatic reconnect in the Java client [message #1746618 is a reply to message #1746597] |
Tue, 01 November 2016 22:37 |
Keith Hughes Messages: 4 Registered: February 2015 |
Junior Member |
|
|
Hi James,
Thanks for the response!
I did wonder if it might be something like the retry frequency had gotten to a place where I was just not going to see it unless I was willing to wait, so I walked away and came back half an hour later.
I will include the code and the log output. The code using the client is written in Scala, but isn't using any Scala idioms that won't be immediately understandable by a Java person.
import org.eclipse.paho.client.mqttv3.IMqttActionListener
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.IMqttMessageListener
import org.eclipse.paho.client.mqttv3.IMqttToken
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
class PahoMqttCommunicationEndpoint(mqttBrokerDescription: MqttBrokerDescription, mqttClientId: String,
executor: ScheduledExecutorService, log: Log) extends MqttCommunicationEndpoint with IdempotentManagedResource {
private var persistence: MqttClientPersistence = null
private var mqttClient: MqttAsyncClient = null
private val mqttConnectOptions = new MqttConnectOptions()
private var connectionListeners: List[MqttConnectionListener] = List()
private var subscribers: List[MqttSubscriber] = List()
private var isReconnection = false
override def onStartup(): Unit = {
try {
persistence = new MemoryPersistence()
mqttClient =
new MqttAsyncClient(mqttBrokerDescription.getBrokerAddress(), mqttClientId, persistence)
mqttClient.setCallback(new MqttCallbackExtended() {
override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
log.error("MQTT connection successful to " + mqttBrokerDescription.getBrokerAddress)
brokerConnectSuccessful(reconnect)
}
override def connectionLost(cause: Throwable): Unit = {
log.error("Lost MQTT connection to " + mqttBrokerDescription.getBrokerAddress, cause)
brokerConnectLost()
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
log.info("Got delivery token " + token.getResponse())
}
override def messageArrived(topic: String, message: MqttMessage): Unit = {
log.info("Received top level MQTT message")
//handleMessageArrived(topic, message)
}
})
mqttConnectOptions.setCleanSession(true)
mqttConnectOptions.setAutomaticReconnect(true)
log.info("Connecting to broker: " + mqttClient.getServerURI())
mqttClient.connect(mqttConnectOptions, new IMqttActionListener() {
override def onSuccess(token: IMqttToken): Unit = {
log.info("MQTT broker connect is successful on token " + token)
}
override def onFailure(token: IMqttToken, cause: Throwable): Unit = {
log.error("MQTT broker connect has failure on token " + token, cause)
brokerConnectFailure()
}
})
} catch {
case e: Throwable => throw new SmartSpacesException("Error when connecting to MQTT broker", e)
}
}
override def onShutdown(): Unit = {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect()
} catch {
case e: Throwable => log.error("Could not disconnect the MQTT client", e)
}
mqttClient = null
}
}
override def getMqttBrokerDescription(): MqttBrokerDescription = {
return mqttBrokerDescription
}
override def getMqttClientId(): String = {
return mqttClientId
}
override def addConnectionListener(listener: MqttConnectionListener): MqttCommunicationEndpoint = {
synchronized {
this.connectionListeners = listener :: connectionListeners
}
this
}
override def subscribe(topicName: String, listener: MqttSubscriberListener, qos: Int, autoreconnect: Boolean): MqttCommunicationEndpoint = {
val subscriber = new MqttSubscriber(topicName, listener, qos, autoreconnect)
if (resourceState == ManagedResourceState.STARTED && mqttClient.isConnected()) {
subscriber.subscribe()
}
synchronized {
subscribers = subscriber :: subscribers
}
this
}
override def getLog(): Log = {
log
}
/**
* The broker connection was successful.
*
* @param reconnect
* {@code true} if a reconnection success
*/
private def brokerConnectSuccessful(reconnect: Boolean): Unit = {
// Subscribe all subscribers.
subscribers.foreach { (subscriber) =>
if (!reconnect /* || subscriber.autoreconnect */ ) {
subscriber.subscribe
}
}
connectionListeners.foreach { listener =>
try {
listener.onMqttConnectionSuccessful(this, reconnect)
} catch {
case e: Throwable => log.error("MQTT connection listener failed on connectionSuccessful", e)
}
}
}
/**
* The broker connection was lost.
*/
private def brokerConnectLost(): Unit = {
connectionListeners.foreach { listener =>
try {
listener.onMqttConnectionLost(this)
} catch {
case e: Throwable => log.error("MQTT connection listener failed on connectionLost", e)
}
}
}
/**
* The broker connection never happened.
*/
private def brokerConnectFailure(): Unit = {
connectionListeners.foreach { listener =>
try {
listener.onMqttConnectionFailure(this)
} catch {
case e: Throwable => log.error("MQTT connection listener failed on connectionLost", e)
}
}
}
private class MqttSubscriber(val topicName: String, val listener: MqttSubscriberListener, val qos: Int, val autoreconnect: Boolean) extends IMqttMessageListener {
def subscribe(): Unit = {
log.info("Subscribing to MQTT topic " + topicName)
try {
mqttClient.subscribe(topicName, qos, this)
} catch {
case e: MqttException => throw SmartSpacesException.newFormattedException(e, "Could not subscribe to MQTT topic %s",
topicName)
}
}
override def messageArrived(topic: String, message: MqttMessage): Unit = {
try {
listener.handleMessage(PahoMqttCommunicationEndpoint.this, topicName, message.getPayload())
} catch {
case e: Throwable => log.error(String.format("Error while handling MQTT message on topic %s", topicName), e)
}
}
}
}
The log data is given below. The sequence is:
1) Have local Mosquitto running
2) Start my code and confirm the connection in the logs
3) Stop the local mosquitto
4) Note in logs that the connection is gone
5) Restart local mosquitto
INFO: Connecting to broker: tcp://127.0.0.1:1883
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog info
INFO: MQTT broker connect is successful on token org.eclipse.paho.client.mqttv3.MqttToken@59425bae
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog error
SEVERE: MQTT connection successful to tcp://127.0.0.1:1883
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog info
INFO: Subscribing to MQTT topic /home/sensor
Nov 01, 2016 4:25:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039130380
Nov 01, 2016 4:25:57 PM io.smartspaces.logging.StandardExtendedLog error
SEVERE: Lost MQTT connection to tcp://127.0.0.1:1883
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
... 1 more
Nov 01, 2016 4:26:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039190380
Nov 01, 2016 4:27:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039250380
Nov 01, 2016 4:28:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039310380
Nov 01, 2016 4:29:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039370380
Nov 01, 2016 4:30:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039430380
I restarted the broker between Nov 01, 2016 4:26:30 PM and Nov 01, 2016 4:27:30 PM.
Here are the mosquitto logs:
1478037985: mosquitto version 1.4.8 (build date Fri, 19 Feb 2016 12:03:16 +0100) starting
1478037985: Config loaded from /etc/mosquitto/mosquitto.conf.
1478037985: Opening ipv4 listen socket on port 1883.
1478037985: Opening ipv6 listen socket on port 1883.
1478039070: New connection from 127.0.0.1 on port 1883.
1478039070: New client connected from 127.0.0.1 as /home/sensor/agregator2 (c1, k60).
1478039157: Error in poll: Interrupted system call.
1478039157: mosquitto version 1.4.8 terminating
1478039199: mosquitto version 1.4.8 (build date Fri, 19 Feb 2016 12:03:16 +0100) starting
1478039199: Config loaded from /etc/mosquitto/mosquitto.conf.
1478039199: Opening ipv4 listen socket on port 1883.
1478039199: Opening ipv6 listen socket on port 1883.
You can see my client connect the first time, but the second time it never happens.
|
|
|
Re: Automatic reconnect in the Java client [message #1746637 is a reply to message #1746618] |
Wed, 02 November 2016 11:20 |
James Sutton Messages: 71 Registered: July 2015 |
Member |
|
|
I can't immediately see anything wrong with your code, I tried writing my own basic example to test Automatic reconnect:
package paho
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3._
class PahoPublish {
private var persistence: MqttClientPersistence = null
private var mqttClient: MqttAsyncClient = null
private var mqttConnectOptions: MqttConnectOptions = null
def main(): Unit = {
println("Eclipse Paho Scala Publish Example")
persistence = new MqttDefaultFilePersistence()
mqttConnectOptions = new MqttConnectOptions()
mqttClient = new MqttAsyncClient("tcp://localhost:1883", "PahoScalaExample", persistence)
mqttConnectOptions.setAutomaticReconnect(true)
mqttConnectOptions.setCleanSession(true)
mqttClient.setCallback(new MqttCallbackExtended {
override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
println("Paho Extended Callback: Connection to " + serverURI + " completed, reconnect: " + reconnect)
println("Publishing Message")
val message: MqttMessage = new MqttMessage("Hello World!".getBytes())
message.setQos(1)
mqttClient.publish("hello/world", message)
println("Message Published")
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
println("Paho Extended Callback: Delivery Complete")
}
override def messageArrived(topic: String, message: MqttMessage): Unit = {
println("Paho Extended Callback: Message Arrived to topic: " + topic)
println(" Message: " + message.getPayload.toString)
}
override def connectionLost(cause: Throwable): Unit = {
println("Paho Extended Callback: Connection Lost: " + cause.getMessage)
}
})
// Connect to the broker
mqttClient.connect(mqttConnectOptions)
}
}
(I'm new to scala, so hopefully that looks ok)
When I tested it in a similar way, it was able to successfully reconnect automatically once I re-started my mosquitto broker:
Eclipse Paho Scala Publish Example
Paho Extended Callback: Connection to tcp://localhost:1883 completed, reconnect: false
Publishing Message
Message Published
Paho Extended Callback: Delivery Complete
Paho Extended Callback: Connection Lost: Connection lost
Paho Extended Callback: Connection to tcp://localhost:1883 completed, reconnect: true
Publishing Message
Message Published
Paho Extended Callback: Delivery Complete
1.4.9/sbin [ ./mosquitto -c myConf.conf ] 8:36 am
1478085179: mosquitto version 1.4.9 (build date 2016-06-18 16:16:49+0100) starting
1478085179: Config loaded from myConf.conf.
1478085179: Opening ipv4 listen socket on port 1883.
1478085179: Opening ipv6 listen socket on port 1883.
1478085192: New connection from 127.0.0.1 on port 1883.
1478085192: New client connected from 127.0.0.1 as PahoScalaExample (c1, k60).
^C1478085197: mosquitto version 1.4.9 terminating
1.4.9/sbin [ ./mosquitto -c myConf.conf ] 11:13 am
1478085202: mosquitto version 1.4.9 (build date 2016-06-18 16:16:49+0100) starting
1478085202: Config loaded from myConf.conf.
1478085202: Opening ipv4 listen socket on port 1883.
1478085202: Opening ipv6 listen socket on port 1883.
1478085204: New connection from 127.0.0.1 on port 1883.
1478085204: New client connected from 127.0.0.1 as PahoScalaExample (c1, k60).
Are you doing anything to the client in "brokerConnectFailure()"? I'm wondering if that is having an effect on it?
|
|
| | |
Goto Forum:
Current Time: Wed Apr 24 20:00:31 GMT 2024
Powered by FUDForum. Page generated in 0.02599 seconds
|