[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
Re: [mosquitto-dev] Mosquitto broker consumes too much memory and never release it.
|
Hi Roger,
As suggested, I just tried it with 1.4 and it's still reproducible. The difference is on 1.3.1 memory consumption goes up even when I try to publish 25000 messages, but it takes 50000 messages to reproduce same issue with 1.4.
And it does not releases memory in any of the two versions.
Attached is my source code for my sample test case.
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class MqttOperations {
private MqttClient client;
public MqttOperations(String clientId) {
try {
client = new MqttClient("tcp://192.168.0.129:1883", clientId, null);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connect() {
try {
client.setCallback(new TestMqttCallback());
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(100000);
options.setConnectionTimeout(10);
options.setCleanSession(true);
client.setTimeToWait(10 * 1000);
client.connect(options);
if (client.isConnected()) {
System.out.println("Connected: " + client.isConnected());
client.subscribe("abc");
}
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void subscribe(String topic) {
try {
client.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(String message) {
try {
// client.publish("abc", message.getBytes(), 0, false);
client.getTopic("abc").publish(message.getBytes(), 0, false);
} catch (MqttException e) {
e.printStackTrace();
}
}
private class TestMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable arg0) {
System.out.println("Connection Lost");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
System.out.println(topic + " : " + message);
}
}
}
public class TestMosquitto {
public static void main(String args[]) {
TestMosquitto testMosquitto = new TestMosquitto();
new Thread(testMosquitto.new MyThread("client1")).start();
new Thread(testMosquitto.new MyThread("client2")).start();
new Thread(testMosquitto.new MyThread("client3")).start();
new Thread(testMosquitto.new MyThread("client4")).start();
new Thread(testMosquitto.new MyThread("client5")).start();
}
class MyThread implements Runnable {
private String clientId;
public MyThread(String clientId) {
this.clientId = clientId;
}
@Override
public void run() {
MqttOperations mqtt = new MqttOperations(clientId);
mqtt.connect();
mqtt.subscribe("abc");
long startTime = System.currentTimeMillis();
for (int i = 0; i < 50000; i++) {
mqtt.publish("Client: " + clientId + ",Message: " + i);
}
long endTime = System.currentTimeMillis();
System.out.println("Time To Execute: " + (endTime - startTime));
}
}
}