Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Paho » Automatic reconnect in the Java client
Automatic reconnect in the Java client [message #1746559] Mon, 31 October 2016 21:36 Go to next message
Keith Hughes is currently offline Keith HughesFriend
Messages: 4
Registered: February 2015
Junior Member
Hi folks,

I am trying to discover if the automatic reconnect on the Paho Java client is functional.

I have an async client that registers both a IMqttActionListener for the connection call and an MqttCallbackExtended as a generic callback on the client so I can get successful connections and lost connection callbacks. I also have the following call

mqttConnectOptions.setAutomaticReconnect(true);

I point the client at a local broker (Mosquitto) on my machine. Everything attaches appropriately and I get my callbacks. I then shutdown the mosquitto service and get MqttCallbackExtended.connectionLost() called.

I then restart the broker. I never ever get a reconnection attempt.

I looked at the code, I am using Paho 1.1.0 from the central Maven Repository. There is reconnect code in what I have, it just doesn't seem to be being called. is it not fully functional, or am I doing something wrong? I will happily post my code if it is supposed to be working so we can track what I might be doing wrong.

-Keith



Re: Automatic reconnect in the Java client [message #1746597 is a reply to message #1746559] Tue, 01 November 2016 14:49 Go to previous messageGo to next message
James Sutton is currently offline James SuttonFriend
Messages: 71
Registered: July 2015
Member
Hi Keith,

That's odd, from what you've explained, it should all be working.
Would you mind posting your code? Also, how long are you waiting for the reconnect? It should attempt to reconnect straight away, but will then double the reconnect delay every time it fails up until the delay reaches about two minutes at which point it will just attempt to connect every two minutes from then on.
Re: Automatic reconnect in the Java client [message #1746618 is a reply to message #1746597] Tue, 01 November 2016 22:37 Go to previous messageGo to next message
Keith Hughes is currently offline Keith HughesFriend
Messages: 4
Registered: February 2015
Junior Member
Hi James,

Thanks for the response!

I did wonder if it might be something like the retry frequency had gotten to a place where I was just not going to see it unless I was willing to wait, so I walked away and came back half an hour later.

I will include the code and the log output. The code using the client is written in Scala, but isn't using any Scala idioms that won't be immediately understandable by a Java person.

import org.eclipse.paho.client.mqttv3.IMqttActionListener
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.IMqttMessageListener
import org.eclipse.paho.client.mqttv3.IMqttToken
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

class PahoMqttCommunicationEndpoint(mqttBrokerDescription: MqttBrokerDescription, mqttClientId: String,
    executor: ScheduledExecutorService, log: Log) extends MqttCommunicationEndpoint with IdempotentManagedResource {

   private var persistence: MqttClientPersistence = null

  private var mqttClient: MqttAsyncClient = null

  private val mqttConnectOptions = new MqttConnectOptions()

  private var connectionListeners: List[MqttConnectionListener] = List()

  private var subscribers: List[MqttSubscriber] = List()

  private var isReconnection = false

  override def onStartup(): Unit = {
    try {
      persistence = new MemoryPersistence()
      mqttClient =
        new MqttAsyncClient(mqttBrokerDescription.getBrokerAddress(), mqttClientId, persistence)

      mqttClient.setCallback(new MqttCallbackExtended() {
        override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
          log.error("MQTT connection successful to " + mqttBrokerDescription.getBrokerAddress)

          brokerConnectSuccessful(reconnect)
        }

        override def connectionLost(cause: Throwable): Unit = {
          log.error("Lost MQTT connection to " + mqttBrokerDescription.getBrokerAddress, cause)

          brokerConnectLost()
        }

        override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
          log.info("Got delivery token " + token.getResponse())
        }

        override def messageArrived(topic: String, message: MqttMessage): Unit = {
          log.info("Received top level MQTT message")
          //handleMessageArrived(topic, message)
        }
      })

      mqttConnectOptions.setCleanSession(true)
      mqttConnectOptions.setAutomaticReconnect(true)

      log.info("Connecting to broker: " + mqttClient.getServerURI())
       mqttClient.connect(mqttConnectOptions, new IMqttActionListener() {
        override def onSuccess(token: IMqttToken): Unit = {
          log.info("MQTT broker connect is successful on token " + token)
        }

        override def onFailure(token: IMqttToken, cause: Throwable): Unit = {
          log.error("MQTT broker connect has failure on token " + token, cause)

          brokerConnectFailure()
        }
      })
    } catch {
      case e: Throwable => throw new SmartSpacesException("Error when connecting to MQTT broker", e)
    }
  }

  override def onShutdown(): Unit = {
    if (mqttClient != null && mqttClient.isConnected()) {
      try {
        mqttClient.disconnect()
      } catch {
        case e: Throwable => log.error("Could not disconnect the MQTT client", e)
      }
      mqttClient = null
    }
  }

  override def getMqttBrokerDescription(): MqttBrokerDescription = {
    return mqttBrokerDescription
  }

  override def getMqttClientId(): String = {
    return mqttClientId
  }

  override def addConnectionListener(listener: MqttConnectionListener): MqttCommunicationEndpoint = {
    synchronized {
      this.connectionListeners = listener :: connectionListeners
    }

    this
  }

  override def subscribe(topicName: String, listener: MqttSubscriberListener, qos: Int, autoreconnect: Boolean): MqttCommunicationEndpoint = {
    val subscriber = new MqttSubscriber(topicName, listener, qos, autoreconnect)

    if (resourceState == ManagedResourceState.STARTED && mqttClient.isConnected()) {
      subscriber.subscribe()
    }

    synchronized {
      subscribers = subscriber :: subscribers
    }

    this
  }

  override def getLog(): Log = {
    log
  }

  /**
   * The broker connection was successful.
   *
   * @param reconnect
   *      {@code true} if a reconnection success
   */
  private def brokerConnectSuccessful(reconnect: Boolean): Unit = {
    // Subscribe all subscribers.
    subscribers.foreach { (subscriber) =>
      if (!reconnect /* || subscriber.autoreconnect */ ) {
        subscriber.subscribe
      }
    }

    connectionListeners.foreach { listener =>
      try {
        listener.onMqttConnectionSuccessful(this, reconnect)
      } catch {
        case e: Throwable => log.error("MQTT connection listener failed on connectionSuccessful", e)
      }
    }
  }

  /**
   * The broker connection was lost.
   */
  private def brokerConnectLost(): Unit = {
    connectionListeners.foreach { listener =>
      try {
        listener.onMqttConnectionLost(this)
      } catch {
        case e: Throwable => log.error("MQTT connection listener failed on connectionLost", e)
      }
    }
  }

  /**
   * The broker connection never happened.
   */
  private def brokerConnectFailure(): Unit = {
    connectionListeners.foreach { listener =>
      try {
        listener.onMqttConnectionFailure(this)
      } catch {
        case e: Throwable => log.error("MQTT connection listener failed on connectionLost", e)
      }
    }
  }

  private class MqttSubscriber(val topicName: String, val listener: MqttSubscriberListener, val qos: Int, val autoreconnect: Boolean) extends IMqttMessageListener {

    def subscribe(): Unit = {
      log.info("Subscribing to MQTT topic " + topicName)

      try {
        mqttClient.subscribe(topicName, qos, this)
      } catch {
        case e: MqttException => throw SmartSpacesException.newFormattedException(e, "Could not subscribe to MQTT topic %s",
          topicName)
      }
    }

    override def messageArrived(topic: String, message: MqttMessage): Unit = {
      try {
        listener.handleMessage(PahoMqttCommunicationEndpoint.this, topicName, message.getPayload())
      } catch {
        case e: Throwable => log.error(String.format("Error while handling MQTT message on topic %s", topicName), e)
      }
    }
  }
}


The log data is given below. The sequence is:

1) Have local Mosquitto running
2) Start my code and confirm the connection in the logs
3) Stop the local mosquitto
4) Note in logs that the connection is gone
5) Restart local mosquitto

INFO: Connecting to broker: tcp://127.0.0.1:1883
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog info
INFO: MQTT broker connect is successful on token org.eclipse.paho.client.mqttv3.MqttToken@59425bae
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog error
SEVERE: MQTT connection successful to tcp://127.0.0.1:1883
Nov 01, 2016 4:24:30 PM io.smartspaces.logging.StandardExtendedLog info
INFO: Subscribing to MQTT topic /home/sensor
Nov 01, 2016 4:25:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039130380
Nov 01, 2016 4:25:57 PM io.smartspaces.logging.StandardExtendedLog error
SEVERE: Lost MQTT connection to tcp://127.0.0.1:1883
Connection lost (32109) - java.io.EOFException
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readByte(DataInputStream.java:267)
	at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
	... 1 more
Nov 01, 2016 4:26:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039190380
Nov 01, 2016 4:27:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039250380
Nov 01, 2016 4:28:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039310380
Nov 01, 2016 4:29:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039370380
Nov 01, 2016 4:30:30 PM io.smartspaces.logging.StandardExtendedLog formatInfo
INFO: Performing sensor model check at 1478039430380


I restarted the broker between Nov 01, 2016 4:26:30 PM and Nov 01, 2016 4:27:30 PM.

Here are the mosquitto logs:

1478037985: mosquitto version 1.4.8 (build date Fri, 19 Feb 2016 12:03:16 +0100) starting
1478037985: Config loaded from /etc/mosquitto/mosquitto.conf.
1478037985: Opening ipv4 listen socket on port 1883.
1478037985: Opening ipv6 listen socket on port 1883.
1478039070: New connection from 127.0.0.1 on port 1883.
1478039070: New client connected from 127.0.0.1 as /home/sensor/agregator2 (c1, k60).
1478039157: Error in poll: Interrupted system call.
1478039157: mosquitto version 1.4.8 terminating
1478039199: mosquitto version 1.4.8 (build date Fri, 19 Feb 2016 12:03:16 +0100) starting
1478039199: Config loaded from /etc/mosquitto/mosquitto.conf.
1478039199: Opening ipv4 listen socket on port 1883.
1478039199: Opening ipv6 listen socket on port 1883.


You can see my client connect the first time, but the second time it never happens.
Re: Automatic reconnect in the Java client [message #1746637 is a reply to message #1746618] Wed, 02 November 2016 11:20 Go to previous messageGo to next message
James Sutton is currently offline James SuttonFriend
Messages: 71
Registered: July 2015
Member
I can't immediately see anything wrong with your code, I tried writing my own basic example to test Automatic reconnect:

package paho

import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3._

class PahoPublish {

  private var persistence: MqttClientPersistence = null
  private var mqttClient: MqttAsyncClient = null
  private var mqttConnectOptions: MqttConnectOptions = null


  def main(): Unit = {
    println("Eclipse Paho Scala Publish Example")

    persistence = new MqttDefaultFilePersistence()
    mqttConnectOptions = new MqttConnectOptions()
    mqttClient = new MqttAsyncClient("tcp://localhost:1883", "PahoScalaExample", persistence)
    mqttConnectOptions.setAutomaticReconnect(true)
    mqttConnectOptions.setCleanSession(true)

    mqttClient.setCallback(new MqttCallbackExtended {
      override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
        println("Paho Extended Callback: Connection to " + serverURI + " completed, reconnect: " + reconnect)
        println("Publishing Message")
        val message: MqttMessage = new MqttMessage("Hello World!".getBytes())
        message.setQos(1)
        mqttClient.publish("hello/world", message)
        println("Message Published")

      }

      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
        println("Paho Extended Callback: Delivery Complete")
      }

      override def messageArrived(topic: String, message: MqttMessage): Unit = {
        println("Paho Extended Callback: Message Arrived to topic: " + topic)
        println("    Message: " + message.getPayload.toString)
      }

      override def connectionLost(cause: Throwable): Unit = {
        println("Paho Extended Callback: Connection Lost: " + cause.getMessage)
      }
    })

    // Connect to the broker
    mqttClient.connect(mqttConnectOptions)
  }
}


(I'm new to scala, so hopefully that looks ok)

When I tested it in a similar way, it was able to successfully reconnect automatically once I re-started my mosquitto broker:

Eclipse Paho Scala Publish Example
Paho Extended Callback: Connection to tcp://localhost:1883 completed, reconnect: false
Publishing Message
Message Published
Paho Extended Callback: Delivery Complete
Paho Extended Callback: Connection Lost: Connection lost
Paho Extended Callback: Connection to tcp://localhost:1883 completed, reconnect: true
Publishing Message
Message Published
Paho Extended Callback: Delivery Complete


1.4.9/sbin [ ./mosquitto -c myConf.conf                                                                                                                                                                                                                               ] 8:36 am
1478085179: mosquitto version 1.4.9 (build date 2016-06-18 16:16:49+0100) starting
1478085179: Config loaded from myConf.conf.
1478085179: Opening ipv4 listen socket on port 1883.
1478085179: Opening ipv6 listen socket on port 1883.
1478085192: New connection from 127.0.0.1 on port 1883.
1478085192: New client connected from 127.0.0.1 as PahoScalaExample (c1, k60).
^C1478085197: mosquitto version 1.4.9 terminating
1.4.9/sbin [ ./mosquitto -c myConf.conf                                                                                                                                                                                                                              ] 11:13 am
1478085202: mosquitto version 1.4.9 (build date 2016-06-18 16:16:49+0100) starting
1478085202: Config loaded from myConf.conf.
1478085202: Opening ipv4 listen socket on port 1883.
1478085202: Opening ipv6 listen socket on port 1883.
1478085204: New connection from 127.0.0.1 on port 1883.
1478085204: New client connected from 127.0.0.1 as PahoScalaExample (c1, k60).


Are you doing anything to the client in "brokerConnectFailure()"? I'm wondering if that is having an effect on it?
Re: Automatic reconnect in the Java client [message #1746654 is a reply to message #1746637] Wed, 02 November 2016 13:48 Go to previous messageGo to next message
Keith Hughes is currently offline Keith HughesFriend
Messages: 4
Registered: February 2015
Junior Member
I am not sure entirely why it came to mind, but this morning as I was waking up I thought "hmm, there is a user context object, I wonder if my options object is being confused as that context object". Sure enough, the 2 argument connect() method uses the context object and the listener and doesn't talk about the options object at all. Once I switched to the 3 argument version of connect(), everything worked fine.

Sorry I had a long post for something so silly. And many thanks for your help, James.

Your Scala looks fine. I am still learning it myself, and perhaps a bit too often my code is Java written in Scala syntax. I am finding I do like the language.
Re: Automatic reconnect in the Java client [message #1746657 is a reply to message #1746654] Wed, 02 November 2016 14:21 Go to previous message
Keith Hughes is currently offline Keith HughesFriend
Messages: 4
Registered: February 2015
Junior Member
For anyone else reading this, because I used the 2 argument connect() method, my options object was used as the user context object. The user context object is handed around as a black box as far as the Paho client code is concerned and because of how I called connect(), the client never knew or cared it was an actual Paho options object that specified that I wanted auto reconnect, etc.

Previous Topic:Paho mqtt cpp project connection errors at build time
Next Topic:Removing Android support libraries
Goto Forum:
  


Current Time: Wed Apr 24 20:00:31 GMT 2024

Powered by FUDForum. Page generated in 0.02599 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top