Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Kura » mqtt (How to subscribe the published data)
mqtt [message #1776749] Tue, 21 November 2017 07:50 Go to next message
gopal korrapati is currently offline gopal korrapatiFriend
Messages: 48
Registered: October 2017
Member
Hi,
I am following the demo heater example bundle already existing in kura . I am able to publishing data.but How can i subscribe the data.these are the logs after running the bundle.
  • Attachment: kura_log.log
    (Size: 9.12KB, Downloaded 196 times)
Re: mqtt [message #1776759 is a reply to message #1776749] Tue, 21 November 2017 09:18 Go to previous messageGo to next message
Matteo Maiero is currently offline Matteo MaieroFriend
Messages: 423
Registered: July 2015
Location: Italy
Senior Member
Hi,
If your broker/cloud platform does not support data displaying, I believe you can go with different options: with another device/computer use an mqtt client to perform the subscription or you can use a java application to do that https://github.com/eurotech/edc-examples or, finally, use kura wires to perform everything locally.

Best regards,
Matteo
Re: mqtt [message #1776823 is a reply to message #1776759] Wed, 22 November 2017 04:17 Go to previous messageGo to next message
gopal korrapati is currently offline gopal korrapatiFriend
Messages: 48
Registered: October 2017
Member
this is the code i am using and trying to debugging my code before publishing i am able to printing that data. after that I am open mymqtt app in my mobile. I am connected to same server like mqtt://iot.eclipse.org and same topic.I am starting to subscribing to same topic but I not getting anything in dashboard?


package org.eclipse.kura.demo.heater;

import java.util.Date;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.kura.cloud.CloudClient;
import org.eclipse.kura.cloud.CloudClientListener;
import org.eclipse.kura.cloud.CloudService;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.message.KuraPayload;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.ComponentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Heater implements ConfigurableComponent, CloudClientListener {

private static final Logger s_logger = LoggerFactory.getLogger(Heater.class);

// Cloud Application identifier
private static final String APP_ID = "heater";

// Publishing Property Names
private static final String MODE_PROP_NAME = "mode";
private static final String MODE_PROP_PROGRAM = "Program";
private static final String MODE_PROP_MANUAL = "Manual";
private static final String MODE_PROP_VACATION = "Vacation";

private static final String PROGRAM_SETPOINT_NAME = "program.setPoint";
private static final String MANUAL_SETPOINT_NAME = "manual.setPoint";

private static final String TEMP_INITIAL_PROP_NAME = "temperature.initial";
private static final String TEMP_INCREMENT_PROP_NAME = "temperature.increment";

private static final String PUBLISH_RATE_PROP_NAME = "publish.rate";
private static final String PUBLISH_TOPIC_PROP_NAME = "publish.semanticTopic";
private static final String PUBLISH_QOS_PROP_NAME = "publish.qos";
private static final String PUBLISH_RETAIN_PROP_NAME = "publish.retain";

private CloudService m_cloudService;
private CloudClient m_cloudClient;

private final ScheduledExecutorService m_worker;
private ScheduledFuture<?> m_handle;

private float m_temperature;
private Map<String, Object> m_properties;
private final Random m_random;

// ----------------------------------------------------------------
//
// Dependencies
//
// ----------------------------------------------------------------

public Heater() {
super();
this.m_random = new Random();
this.m_worker = Executors.newSingleThreadScheduledExecutor();
}

public void setCloudService(CloudService cloudService) {
this.m_cloudService = cloudService;
}

public void unsetCloudService(CloudService cloudService) {
this.m_cloudService = null;
}

// ----------------------------------------------------------------
//
// Activation APIs
//
// ----------------------------------------------------------------

protected void activate(ComponentContext componentContext, Map<String, Object> properties) {
s_logger.info("Activating Heater...");

this.m_properties = properties;
for (String s : properties.keySet()) {
s_logger.info("Activate - " + s + ": " + properties.get(s));
}

// get the mqtt client for this application
try {

// Acquire a Cloud Application Client for this Application
s_logger.info("Getting CloudClient for {}...", APP_ID);
this.m_cloudClient = this.m_cloudService.newCloudClient(APP_ID);
this.m_cloudClient.addCloudClientListener(this);

// Don't subscribe because these are handled by the default
// subscriptions and we don't want to get messages twice
doUpdate(false);
} catch (Exception e) {
s_logger.error("Error during component activation", e);
throw new ComponentException(e);
}
s_logger.info("Activating Heater... Done.");
}

protected void deactivate(ComponentContext componentContext) {
s_logger.debug("Deactivating Heater...");

// shutting down the worker and cleaning up the properties
this.m_worker.shutdown();

// Releasing the CloudApplicationClient
s_logger.info("Releasing CloudApplicationClient for {}...", APP_ID);
this.m_cloudClient.release();

s_logger.debug("Deactivating Heater... Done.");
}

public void updated(Map<String, Object> properties) {
s_logger.info("Updated Heater...");

// store the properties received
this.m_properties = properties;
for (String s : properties.keySet()) {
s_logger.info("Update - " + s + ": " + properties.get(s));
}

// try to kick off a new job
doUpdate(true);
s_logger.info("Updated Heater... Done.");
}

// ----------------------------------------------------------------
//
// Cloud Application Callback Methods
//
// ----------------------------------------------------------------

@Override
public void onControlMessageArrived(String deviceId, String appTopic, KuraPayload msg, int qos, boolean retain) {
// TODO Auto-generated method stub
s_logger.info("onControlMessageArrived");
}

@Override
public void onMessageArrived(String deviceId, String appTopic, KuraPayload msg, int qos, boolean retain) {
// TODO Auto-generated method stub
s_logger.info("onMessageArrived" + msg.getMetric("temperatureInternal"));
}

@Override
public void onConnectionLost() {
// TODO Auto-generated method stub

}

@Override
public void onConnectionEstablished() {
// TODO Auto-generated method stub

}

@Override
public void onMessageConfirmed(int messageId, String appTopic) {
// TODO Auto-generated method stub

}

@Override
public void onMessagePublished(int messageId, String appTopic) {
// TODO Auto-generated method stub
s_logger.info("onMessagepublished" + appTopic);

}

// ----------------------------------------------------------------
//
// Private Methods
//
// ----------------------------------------------------------------

/**
* Called after a new set of properties has been configured on the service
*/
private void doUpdate(boolean onUpdate) {
// cancel a current worker handle if one if active
if (this.m_handle != null) {
this.m_handle.cancel(true);
}

if (!this.m_properties.containsKey(TEMP_INITIAL_PROP_NAME)
|| !this.m_properties.containsKey(PUBLISH_RATE_PROP_NAME)) {
s_logger.info(
"Update Heater - Ignore as properties do not contain TEMP_INITIAL_PROP_NAME and PUBLISH_RATE_PROP_NAME.");
return;
}

// reset the temperature to the initial value
if (!onUpdate) {
this.m_temperature = (Float) this.m_properties.get(TEMP_INITIAL_PROP_NAME);
}

// schedule a new worker based on the properties of the service
int pubrate = (Integer) this.m_properties.get(PUBLISH_RATE_PROP_NAME);
this.m_handle = this.m_worker.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
Thread.currentThread().setName(getClass().getSimpleName());
doPublish();
// publish();
}
}, 0, pubrate, TimeUnit.MINUTES);
}

/**
* Called at the configured rate to publish the next temperature measurement.
*/

// private void publish() {
// String topic = (String) this.m_properties.get(PUBLISH_TOPIC_PROP_NAME);
// Integer qos = (Integer) this.m_properties.get(PUBLISH_QOS_PROP_NAME);
// Boolean retain = (Boolean) this.m_properties.get(PUBLISH_RETAIN_PROP_NAME);
// String mode = (String) this.m_properties.get(MODE_PROP_NAME);
//
// // Publish the message
// try {
// s_logger.info("payload content :- " + payload.getMetric("temperatureInternal"));
// this.m_cloudClient.publish(topic, payload, qos, retain);
// s_logger.info("Published to {} message: {}", topic, payload);
// } catch (Exception e) {
// s_logger.error("Cannot publish topic: " + topic, e);
// }
//
// }
//
//

private void doPublish() {
// fetch the publishing configuration from the publishing properties
String topic = (String) this.m_properties.get(PUBLISH_TOPIC_PROP_NAME);
Integer qos = (Integer) this.m_properties.get(PUBLISH_QOS_PROP_NAME);
Boolean retain = (Boolean) this.m_properties.get(PUBLISH_RETAIN_PROP_NAME);
String mode = (String) this.m_properties.get(MODE_PROP_NAME);

// Increment the simulated temperature value
float setPoint = 0;
float tempIncr = (Float) this.m_properties.get(TEMP_INCREMENT_PROP_NAME);
if (MODE_PROP_PROGRAM.equals(mode)) {
setPoint = (Float) this.m_properties.get(PROGRAM_SETPOINT_NAME);
} else if (MODE_PROP_MANUAL.equals(mode)) {
setPoint = (Float) this.m_properties.get(MANUAL_SETPOINT_NAME);
} else if (MODE_PROP_VACATION.equals(mode)) {
setPoint = 6.0F;
}
if (this.m_temperature + tempIncr < setPoint) {
this.m_temperature += tempIncr;
} else {
this.m_temperature -= 4 * tempIncr;
}

// Allocate a new payload
KuraPayload payload = new KuraPayload();

// Timestamp the message
payload.setTimestamp(new Date());

// Add the temperature as a metric to the payload
payload.addMetric("temperatureInternal", this.m_temperature);
payload.addMetric("temperatureExternal", 5.0F);
payload.addMetric("temperatureExhaust", 30.0F);

int code = this.m_random.nextInt();
if (this.m_random.nextInt() % 5 == 0) {
payload.addMetric("errorCode", code);
} else {
payload.addMetric("errorCode", 0);
}

// Publish the message
try {
s_logger.info("payload content :- " + payload.getMetric("temperatureInternal"));
this.m_cloudClient.publish(topic, payload, qos, retain);
s_logger.info("Published to {} message: {}", topic, payload);
} catch (Exception e) {
s_logger.error("Cannot publish topic: " + topic, e);
}

// subscribe the message
//
// try {
// this.m_cloudClient.subscribe(topic, qos);
// s_logger.info("subscribe the {} message: {}", topic, qos);
// } catch (Exception e) {
// s_logger.error("Cannot publish topic: " + topic, e);
// }
}
Re: mqtt [message #1776840 is a reply to message #1776823] Wed, 22 November 2017 08:27 Go to previous messageGo to next message
Matteo Maiero is currently offline Matteo MaieroFriend
Messages: 423
Registered: July 2015
Location: Italy
Senior Member
Does your mqtt app support the Kura Protobuf format? If not, it's likely that you are not able to decode the received messages.

Best regards,
Matteo
Re: mqtt [message #1776855 is a reply to message #1776840] Wed, 22 November 2017 09:55 Go to previous messageGo to next message
gopal korrapati is currently offline gopal korrapatiFriend
Messages: 48
Registered: October 2017
Member
thanks for your reply

If I publish the data through demo heater example.where it will store?
now I got new error like connection problem in web UI.but in osgi i am typing the ss and ls, cloudservice is showing ACTIVE.after that I tried telnet iot.eclipse.org it showing as below attachments.file:///home/gopal/Pictures/telnet.png file:///home/gopal/Pictures/connection%20error.png
Re: mqtt [message #1776857 is a reply to message #1776855] Wed, 22 November 2017 10:10 Go to previous messageGo to next message
gopal korrapati is currently offline gopal korrapatiFriend
Messages: 48
Registered: October 2017
Member
file:///home/gopal/Pictures/connection%20error.png

file:///home/gopal/Pictures/telnet.png
Re: mqtt [message #1776941 is a reply to message #1776857] Thu, 23 November 2017 04:25 Go to previous messageGo to next message
gopal korrapati is currently offline gopal korrapatiFriend
Messages: 48
Registered: October 2017
Member
thanks for support my problem is solved
Re: mqtt [message #1776994 is a reply to message #1776941] Thu, 23 November 2017 13:31 Go to previous message
Matteo Maiero is currently offline Matteo MaieroFriend
Messages: 423
Registered: July 2015
Location: Italy
Senior Member
Great.
Thanks for the update.

Maybe if you could provide an idea about the solution you achieved, it could help others with the same problem.

Best regards,
Matteo
Previous Topic:kura-console.log
Next Topic:Wires without 'Timer'
Goto Forum:
  


Current Time: Tue Apr 23 14:37:06 GMT 2024

Powered by FUDForum. Page generated in 0.04270 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top