Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Paho » PAHO SUBSCRIBE ERROR please help me
PAHO SUBSCRIBE ERROR please help me [message #1794267] Tue, 28 August 2018 10:54 Go to next message
WONTAE KIM is currently offline WONTAE KIMFriend
Messages: 1
Registered: August 2018
Junior Member
Hello

I have a question.

My develop resource
->EMQTTD MQTT BROKER
-> spring boot 2.0.3
compile group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.0'

senario
#1. publish simulator (10000 (300byte String) publish, per 1sec) qos0
     / test / monitor / 1 <--- publish

#2. subscribe qos0 simulator / test / monitor / 1
   
I found an error in this situation as follows.


MQTT BROKER did not stop
PUBLISH # 1 simulator did not stop.
This error only appears in SUBSCRIBE # 2. Is this a problem?

please .. help me

-[error/subscribe simulartor #2]--------------------------
연결 유실 (32109) - java.io.EOFException
2018-08-28 19:31:05.781  INFO 18176 --- [ Rec: monitor_1] com.vitcon.consumer.VitconConsumer  : 여기냐!
2018-08-28 19:31:05.781  INFO 18176 --- [ Rec: monitor_1] com.vitcon.consumer.VitconConsumer  : 연결 유실
2018-08-28 19:31:05.781  INFO 18176 --- [ Rec: monitor_1] com.vitcon.consumer.VitconConsumer  : 접속 시도 :monitor_1
2018-08-28 19:31:05.782 ERROR 18176 --- [ Rec: monitor_1] com.vitcon.consumer.VitconConsumer  : connect failed:MqttException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:181)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:133)
... 7 more

-----------------------------------------------
my mqtt paho source
package com.vitcon.consumer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.vitcon.agent.config.VitconConfig;

public abstract class VitconConsumer implements Runnable, MqttCallback {

private static final Logger logger = LoggerFactory.getLogger(VitconConsumer.class);

@Autowired
private VitconConfig config;

private MqttClient mqttClient = null;
private MqttConnectOptions connOpts;
private HashMap<String, Boolean> mqttTopics = new HashMap<>();

public void disconnect() {
if (mqttClient == null && !mqttClient.isConnected())
return;
try {
mqttClient.disconnect();
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
e.printStackTrace();
}
}

public boolean init() {
MemoryPersistence persistence = new MemoryPersistence();
connOpts = new MqttConnectOptions();
connOpts.setConnectionTimeout(60);
connOpts.setCleanSession(true);
connOpts.setMaxInflight(10000);
connOpts.setAutomaticReconnect(false);
//connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);

try {
// mqttClient = new MqttClient(config.getMqttUri(), config.getMqttClientId(), persistence);
logger.info("[mqtt]connect:" + config.getMqttUri() + " clientId:" + getMqttClientId());
mqttClient = new MqttClient(config.getMqttUri(), getMqttClientId(), persistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("mqtt connect failed:" + e.getMessage());
return false;
}

mqttClient.setCallback(this);

return true;
}

public boolean connect() {
if (mqttClient != null && mqttClient.isConnected())
return true;

try {
logger.info("접속 시도 :" + mqttClient.getClientId());
mqttClient.connect(connOpts);
logger.info("접속 완료 :" + mqttClient.getClientId());

Iterator<String> e = this.mqttTopics.keySet().iterator();
while (e.hasNext()) {
String topic = e.next();
logger.info("[mqtt topic]:" + topic + " qos:"+config.getMqttQos());
mqttClient.subscribe(topic, config.getMqttQos());
// logger.info("set topic:" + topic);
}
} catch (MqttSecurityException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
}

if (mqttClient.isConnected())
return true;


return false;
}

public abstract String getMqttClientId();

public boolean isRun() {
return config.isRun();
}

public void setTopic(String topic) {
this.mqttTopics.put(topic, null);
}

public void removeTopic(String topic) {
this.mqttTopics.remove(topic);
}

@Override
public void connectionLost(Throwable cause) {

logger.info("코넥션 로스트:" + mqttClient.isConnected());
cause.printStackTrace();
logger.info("여기냐!");
logger.info(cause.getMessage());
// TODO Auto-generated method stub
while (config.isRun()) {
if (connect())
return;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("connectionLost:" + e.getMessage());
}
}
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {

try {

} catch (Exception e) {
e.printStackTrace();
logger.error("wontae" + e.getMessage());
// TODO: handle exception
}

// TODO Auto-generated method stub

}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub

}

@Override
public void run() {
// TODO Auto-generated method stub

}


}




Re: PAHO SUBSCRIBE ERROR please help me [message #1794343 is a reply to message #1794267] Wed, 29 August 2018 09:35 Go to previous message
James Sutton is currently offline James SuttonFriend
Messages: 71
Registered: July 2015
Member
Hi, an EOFException implies that the server has closed the connection to the subscriber. I would suggest looking at the server logs to see why it is doing that. Do you get different results if you change the number of messages being published per second? That may also have an impact.
Previous Topic:MQTTClient_connect always gives -1
Next Topic:C Paho Api Threading Issue
Goto Forum:
  


Current Time: Sat Apr 20 04:42:43 GMT 2024

Powered by FUDForum. Page generated in 0.03114 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top