PAHO SUBSCRIBE ERROR please help me [message #1794267] |
Tue, 28 August 2018 10:54 |
WONTAE KIM Messages: 1 Registered: August 2018 |
Junior Member |
|
|
Hello
I have a question.
My develop resource
->EMQTTD MQTT BROKER
-> spring boot 2.0.3
compile group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.0'
senario
#1. publish simulator (10000 (300byte String) publish, per 1sec) qos0
/ test / monitor / 1 <--- publish
#2. subscribe qos0 simulator / test / monitor / 1
I found an error in this situation as follows.
MQTT BROKER did not stop
PUBLISH # 1 simulator did not stop.
This error only appears in SUBSCRIBE # 2. Is this a problem?
please .. help me
-[error/subscribe simulartor #2]--------------------------
연결 유실 (32109) - java.io.EOFException
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 여기냐!
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 연결 유실
[2m2018-08-28 19:31:05.781[0;39m [32m INFO[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m 접속 시도 :monitor_1
[2m2018-08-28 19:31:05.782[0;39m [31mERROR[0;39m [35m18176[0;39m [2m---[0;39m [2m[ Rec: monitor_1][0;39m [36mcom.vitcon.consumer.VitconConsumer [0;39m [2m:[0;39m connect failed:MqttException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:181)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:133)
... 7 more
-----------------------------------------------
my mqtt paho source
package com.vitcon.consumer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.vitcon.agent.config.VitconConfig;
public abstract class VitconConsumer implements Runnable, MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(VitconConsumer.class);
@Autowired
private VitconConfig config;
private MqttClient mqttClient = null;
private MqttConnectOptions connOpts;
private HashMap<String, Boolean> mqttTopics = new HashMap<>();
public void disconnect() {
if (mqttClient == null && !mqttClient.isConnected())
return;
try {
mqttClient.disconnect();
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
e.printStackTrace();
}
}
public boolean init() {
MemoryPersistence persistence = new MemoryPersistence();
connOpts = new MqttConnectOptions();
connOpts.setConnectionTimeout(60);
connOpts.setCleanSession(true);
connOpts.setMaxInflight(10000);
connOpts.setAutomaticReconnect(false);
//connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
try {
// mqttClient = new MqttClient(config.getMqttUri(), config.getMqttClientId(), persistence);
logger.info("[mqtt]connect:" + config.getMqttUri() + " clientId:" + getMqttClientId());
mqttClient = new MqttClient(config.getMqttUri(), getMqttClientId(), persistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("mqtt connect failed:" + e.getMessage());
return false;
}
mqttClient.setCallback(this);
return true;
}
public boolean connect() {
if (mqttClient != null && mqttClient.isConnected())
return true;
try {
logger.info("접속 시도 :" + mqttClient.getClientId());
mqttClient.connect(connOpts);
logger.info("접속 완료 :" + mqttClient.getClientId());
Iterator<String> e = this.mqttTopics.keySet().iterator();
while (e.hasNext()) {
String topic = e.next();
logger.info("[mqtt topic]:" + topic + " qos:"+config.getMqttQos());
mqttClient.subscribe(topic, config.getMqttQos());
// logger.info("set topic:" + topic);
}
} catch (MqttSecurityException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
} catch (MqttException e) {
// TODO Auto-generated catch block
logger.error("connect failed:" + e.getMessage());
}
if (mqttClient.isConnected())
return true;
return false;
}
public abstract String getMqttClientId();
public boolean isRun() {
return config.isRun();
}
public void setTopic(String topic) {
this.mqttTopics.put(topic, null);
}
public void removeTopic(String topic) {
this.mqttTopics.remove(topic);
}
@Override
public void connectionLost(Throwable cause) {
logger.info("코넥션 로스트:" + mqttClient.isConnected());
cause.printStackTrace();
logger.info("여기냐!");
logger.info(cause.getMessage());
// TODO Auto-generated method stub
while (config.isRun()) {
if (connect())
return;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error("connectionLost:" + e.getMessage());
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
} catch (Exception e) {
e.printStackTrace();
logger.error("wontae" + e.getMessage());
// TODO: handle exception
}
// TODO Auto-generated method stub
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void run() {
// TODO Auto-generated method stub
}
}
|
|
|
|
Powered by
FUDForum. Page generated in 0.03573 seconds