Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[iot-kapua-sp] Kapua- Camel Mqtt COnnection

	Hi I a have connected Kura and kapua and the data is getting published from kura to kapua. This I can see it in Web UI of Kapua. I have used example publisher from ecllipse community by installing it as a package using kura UI. I want to get the data from kapua into kafka. For that I am using Apache camel. I used camel mqtt component to connect to kapua. I am able to connect well(I can see the connection in the kapua web UI.) But the data is not getting transferred. Can you help me with this? This is my code.



import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListenerSupport;
import org.apache.camel.main.MainSupport;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.Properties;

public class MainExample {

    public static void main(String[] args) throws Exception {
        MainExample example = new MainExample();
        example.boot();
    }
 
    private void boot() throws Exception {
        // create a Main instance
        Main main = new Main();
        main.addRouteBuilder(new MyRouteBuilder());
        // add event listener
        main.addMainListener(new Events());
        // run until you terminate the JVM
        System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
        main.run();
    }
 
    private static class MyRouteBuilder extends RouteBuilder {
        @Override
        public void configure() throws Exception {

            from("mqtt://kapua?host=tcp://kapua_mqtt_brokeruri:1883&subscribeTopicName=EXAMPLE_PUBLISHER/data/metrics&userName=kapua-sys&password=kapua-password&clientId=kafka")
                    .process(exchange -> System.out.println("Invoked timer at " + new Date()))
                    .process(new MyLogProcessor())
                    .to("kafka:kafka-kapua?brokers=kafka_brokeruri:9092");

        }
    }
 
    public static class Events extends MainListenerSupport {
 
        @Override
        public void afterStart(MainSupport main) {
            System.out.println("MainExample with Camel is now started!");
        }
 
        @Override
        public void beforeStop(MainSupport main) {
            System.out.println("MainExample with Camel is now being stopped!");
        }
    }


    public class MyLogProcessor implements Processor {
    
        public void process(Exchange exchange) throws Exception {
            System.out.println(new Date()+":Now processing the String : " + exchange.getIn().getBody(String.class));    }
    }
}

















Back to the top