PAHO SUBSCRIBE ERROR please help me [message #1794267] |
Tue, 28 August 2018 06:54  |
Eclipse User |
|
|
|
Hello
I have a question.
My develop resource
->EMQTTD MQTT BROKER
-> spring boot 2.0.3
compile group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.0'
senario
#1. publish simulator (10000 (300byte String) publish, per 1sec) qos0
/ test / monitor / 1 <--- publish
#2. subscribe qos0 simulator / test / monitor / 1
I found an error in this situation as follows.
MQTT BROKER did not stop
PUBLISH # 1 simulator did not stop.
This error only appears in SUBSCRIBE # 2. Is this a problem?
please .. help me
-[error/subscribe simulartor #2]--------------------------
연결 유실 (32109) - java.io.EOFException
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 여기냐!
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 연결 유실
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 접속 시도 :monitor_1
[2m2018-08-28 19:31:05.782[0;39m [31mERROR[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m connect failed:MqttException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:181)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:133)
... 7 more
-----------------------------------------------
my mqtt paho source
package com.vitcon.consumer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.vitcon.agent.config.VitconConfig;
public abstract class VitconConsumer implements Runnable, MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(VitconConsumer.class);
@Autowired
private VitconConfig config;
private MqttClient mqttClient = null;
private MqttConnectOptions connOpts;
private HashMap<String, Boolean> mqttTopics = new HashMap<>();
public void disconnect() {
if (mqttClient == null && !mqttClient.isConnected())
return;
try {
mqttClient.disconnect();
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
e.printStackTrace();
}
}
public boolean init() {
MemoryPersistence persistence = new MemoryPersistence();
connOpts = new MqttConnectOptions();
connOpts.setConnectionTimeout(60);
connOpts.setCleanSession(true);
connOpts.setMaxInflight(10000);
connOpts.setAutomaticReconnect(false);
//connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
try {
// mqttClient = new MqttClient(config.getMqttUri(), config.getMqttClientId(), persistence);
logger.info("[mqtt]connect:" + config.getMqttUri() + " clientId:" + getMqttClientId());
mqttClient = new MqttClient(config.getMqttUri(), getMqttClientId(), persistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("mqtt connect failed:" + e.getMessage());
return false;
}
mqttClient.setCallback(this);
return true;
}
public boolean connect() {
if (mqttClient != null && mqttClient.isConnected())
return true;
try {
logger.info("접속 시도 :" + mqttClient.getClientId());
mqttClient.connect(connOpts);
logger.info("접속 완료 :" + mqttClient.getClientId());
Iterator<String> e = this.mqttTopics.keySet().iterator();
while (e.hasNext()) {
String topic = e.next();
logger.info("[mqtt topic]:" + topic + " qos:"+config.getMqttQos());
mqttClient.subscribe(topic, config.getMqttQos());
// logger.info("set topic:" + topic);
}
} catch (MqttSecurityException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
}
if (mqttClient.isConnected())
return true;
return false;
}
public abstract String getMqttClientId();
public boolean isRun() {
return config.isRun();
}
public void setTopic(String topic) {
this.mqttTopics.put(topic, null);
}
public void removeTopic(String topic) {
this.mqttTopics.remove(topic);
}
@Override
public void connectionLost(Throwable cause) {
logger.info("코넥션 로스트:" + mqttClient.isConnected());
cause.printStackTrace();
logger.info("여기냐!");
logger.info(cause.getMessage());
// TODO Auto-generated method stub
while (config.isRun()) {
if (connect())
return;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("connectionLost:" + e.getMessage());
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
} catch (Exception e) {
e.printStackTrace();
logger.error("wontae" + e.getMessage());
// TODO: handle exception
}
// TODO Auto-generated method stub
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void run() {
// TODO Auto-generated method stub
}
}
|
|
|
Re: PAHO SUBSCRIBE ERROR please help me [message #1794343 is a reply to message #1794267] |
Wed, 29 August 2018 05:35  |
Eclipse User |
|
|
|
Hi, an EOFException implies that the server has closed the connection to the subscriber. I would suggest looking at the server logs to see why it is doing that. Do you get different results if you change the number of messages being published per second? That may also have an impact.
|
|
|
Powered by
FUDForum. Page generated in 0.03645 seconds