Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Problems adapting geomesa-gdelt-master to ingest other data

You'll need to put the GeoMesa distributed jar into each tablet server's lib/ext directory. That could be the issue.

On 05/16/2014 05:13 PM, Barnhart, Bob M. wrote:

It looks like I’m getting a ClassNotFoundException: geomesa.core.iterators.SpatioTemporalIntersectingIterator from Accumulo.

 

Is there a special Accumulo configuration required so that it can integrate with GeoMesa (e.g., update $ACCUMULO_HOME/conf/accumulo-site.xml to point to one/more GeoMesa .jar files)?

 

 

From: Hunter Provyn [mailto:fhp@xxxxxxxx]
Sent: Friday, May 16, 2014 10:58 AM
To: Barnhart, Bob M.; hunter@xxxxxxxx; geomesa-users@xxxxxxxxxxxxxxxx; geomesa@xxxxxxxx
Subject: Re: [geomesa-users] Problems adapting geomesa-gdelt-master to ingest other data

 

Bob,

To get a more detailed error message, go to that tserver (127.0.0.1) and check the most recent accumulo/logs file.
Then we can help you diagnose further.

Hunter

On 05/16/2014 01:45 PM, Barnhart, Bob M. wrote:

Hunter,

 

Thanks for the suggestions. I defined my time field as “Time:Data”, and added "*geom:Point:srid=4326" to the attributes list.

 

There was another problem in that my sensor.csv file still had the typical row[0] containing the column titles, and I removed that row.

 

Now I’m past those errors, but am getting something more obscure:

 

14/05/16 10:17:13 WARN impl.TabletServerBatchReaderIterator: Error on server 127.0.0.1:9997

org.apache.accumulo.core.client.impl.AccumuloServerException: Error on server 127.0.0.1:9997

                at org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator.doLookup(TabletServerBatchReaderIterator.java:704)

                at org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator$QueryTask.run(TabletServerBatchReaderIterator.java:361)

                at org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

                at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRunnable.java:34)

                at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.thrift.TApplicationException: Internal error processing startMultiScan

                at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)

                at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)

                at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Client.recv_startMultiScan(TabletClientService.java:294)

                at org.apache.accumulo.core.tabletserver.thrift.TabletClientService$Client.startMultiScan(TabletClientService.java:274)

                at org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator.doLookup(TabletServerBatchReaderIterator.java:644)

                ... 7 more

 

I don’t know if this is a problem in my Accumulo installation/configuration or what.

 

Still working on it…

 

Bob

 

From: Hunter Provyn [mailto:fhp@xxxxxxxx]
Sent: Thursday, May 15, 2014 2:50 PM
To: Barnhart, Bob M.; hunter@xxxxxxxx; geomesa-users@xxxxxxxxxxxxxxxx; geomesa@xxxxxxxx
Subject: Re: [geomesa-users] Problems adapting geomesa-gdelt-master to ingest other data

 

Bob,

I notice now one other difference between your version and the original code. It could be the problem. Try adding a final attribute: "*geom:Point:srid=4326"

Hunter


On 05/15/2014 05:40 PM, Barnhart, Bob M. wrote:

Hunter,

 

Thanks for the heads-up and suggestion.

 

My notes indicated that I (thought I) had tried “Time:Date” before and got an error on the declaration, but I just ran it now and it accepted “Time:Date”. However, even with “Time:Date”  I’m still getting the same NullPointerException as before.

 

Bob

 

From: Hunter Provyn [mailto:fhp@xxxxxxxx]
Sent: Thursday, May 15, 2014 1:46 PM
To: geomesa-users@xxxxxxxxxxxxxxxx; Barnhart, Bob M.; geomesa@xxxxxxxx
Subject: Re: [geomesa-users] Problems adapting geomesa-gdelt-master to ingest other data

 

Hi Bob,

One problem that I see is that a field that is not a Date is being assigned a Date value and GeoMesa is expecting it to be a Date.
Did you try "Time:Date" instead of "Time:Integer"? The only time class that geotools supports is java.util.Date.


Hunter

On 05/15/2014 01:18 PM, Barnhart, Bob M. wrote:

(My apologies if this is a re-post due to email-address issues)

 

I have experienced problems ingesting any of the sample GDELT data cited from the GeoMesa tutorial instructions on http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.

 

Consequently, I tried to adapt the code from  https://github.com/ccri/geomesa-gdelt to ingest a different data-set.

 

My data (in.CSV format—I did convert to .TSV format for ingestion) looks like:

 

EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation

1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966

2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236

3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794

4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117

5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797

6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299

7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217

8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836

9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621

10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848

. . .

 

(Note: the “Time” field is a Unix/Java-style time: i.e., # milliseconds since 1970.01.01).

 

I adapted the GDELTIngest class to SensorIngest.java as shown below.

 

I also adapted the GDELTIngestMapper class to SensorIngestMapper.java as shown below.

 

I essentially used the technique from the tutorial to upload my data-file to HDFS (after converting to .TSV format).

 

I then used basically the same ingestion command from the tutorial:

 

hadoop jar ./target/geomesa-sensor-1.0-SNAPSHOT.jar   geomesa.sensor.SensorIngest     -instanceId ntc-irad        -zookeepers localhost:2181      -user root -password “…”     -auths public       -tableName sensor       -featureName sensor     -ingestFile hdfs:///sensor/sensor.tsv

 

The ingestion begins, the “sensor” table is created in Accumulo, but after processing 4 “entries”, the ingestion fails with the following NullPointerException (in Hadoop’s MapReduce?):

 

ava.lang.Exception: java.lang.NullPointerException

    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)

    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)

Caused by: java.lang.NullPointerException

    at geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)

    at geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)

    at geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)

    at geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)

    at geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)

    at org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)

    at org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)

    at geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)

    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)

    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)

    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)

    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

    at java.util.concurrent.FutureTask.run(FutureTask.java:262)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

    at java.lang.Thread.run(Thread.java:744)

14/05/14 13:44:37 INFO mapreduce.Job: Job job_local1747290386_0001 failed with state FAILED due to: NA

14/05/14 13:44:37 INFO mapreduce.Job: Counters: 0

Exception in thread "main" java.lang.Exception: Job failed

    at geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)

    at geomesa.sensor.SensorIngest.main(SensorIngest.java:112)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

 

The line tagged with “!!!” in SensorIngestMapper.java listing is where the code is dying (in the DataStore.getFeatureWriter call).

 

I’m probably missing something simple, but so far I’ve been unable to get this simple data-file to ingest into Accumulo using the GeoMesa/Hadoop mechanism (although I can ingest data directly into Accumulo).

 

Also, I’m not sure what attribute-type to associate with the “Time” field in the “attributes” List. I’ve tried “:Time”, “:Timestamp”, “:Datetime”, etc. with no success, so I’ve had to label it simply as “:Integer”.

 

Any help or suggestions would be greatly appreciated!

 

Regards,

Bob Barnhart (barnhartr@xxxxxxxxxx)

 

// SensorIngest.java ===========================================================================================

 

package geomesa.sensor;

 

import com.google.common.base.Joiner;

import com.google.common.collect.Lists;

import geomesa.core.index.Constants;

import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;

import org.apache.commons.cli.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.geotools.data.DataStore;

import org.geotools.data.DataStoreFinder;

import org.geotools.data.DataUtilities;

import org.geotools.feature.SchemaException;

import org.opengis.feature.simple.SimpleFeatureType;

 

import java.util.*;

 

public class SensorIngest {

 

    static String INSTANCE_ID = "instanceId";

    static String ZOOKEEPERS = "zookeepers";

    static String USER = "user";

    static String PASSWORD = "password";

    static String AUTHS = "auths";

    static String TABLE_NAME = "tableName";

    static String FEATURE_NAME = "featureName";

    static String INGEST_FILE = "ingestFile";

    static String[] ACCUMULO_CONNECTION_PARAMS = new String[]{INSTANCE_ID,

            ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};

 

    static Options getCommonRequiredOptions() {

        Options options = new Options();

        Option instanceIdOpt = OptionBuilder.withArgName(INSTANCE_ID)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("accumulo connection parameter instanceId")

                                         .create(INSTANCE_ID);

        Option zookeepersOpt = OptionBuilder.withArgName(ZOOKEEPERS)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("accumulo connection parameter zookeepers")

                                         .create(ZOOKEEPERS);

        Option userOpt = OptionBuilder.withArgName(USER)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("accumulo connection parameter user")

                                         .create(USER);

        Option passwordOpt = OptionBuilder.withArgName(PASSWORD)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("accumulo connection parameter password")

                                         .create(PASSWORD);

        Option authsOpt = OptionBuilder.withArgName(AUTHS)

                                         .hasArg()

                                         .withDescription("accumulo connection parameter auths")

                                         .create(AUTHS);

        Option tableNameOpt = OptionBuilder.withArgName(TABLE_NAME)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("accumulo connection parameter tableName")

                                         .create(TABLE_NAME);

        Option featureNameOpt = OptionBuilder.withArgName(FEATURE_NAME)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("name of feature in accumulo table")

                                         .create(FEATURE_NAME);

        options.addOption(instanceIdOpt);

        options.addOption(zookeepersOpt);

        options.addOption(userOpt);

        options.addOption(passwordOpt);

        options.addOption(authsOpt);

        options.addOption(tableNameOpt);

        options.addOption(featureNameOpt);

        return options;

    }

 

    public static Map<String, String> getAccumuloDataStoreConf(CommandLine cmd) {

        Map<String , String> dsConf = new HashMap<String , String>();

        for (String param : ACCUMULO_CONNECTION_PARAMS) {

            dsConf.put(param, cmd.getOptionValue(param));

        }

        if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS, "");

        return dsConf;

    }

 

    public static void main(String [ ] args) throws Exception {

        CommandLineParser parser = new BasicParser();

        Options options = getCommonRequiredOptions();

        Option ingestFileOpt = OptionBuilder.withArgName(INGEST_FILE)

                                         .hasArg()

                                         .isRequired()

                                         .withDescription("ingest tsv file on hdfs")

                                         .create(INGEST_FILE);

        options.addOption(ingestFileOpt);

 

        CommandLine cmd = parser.parse( options, args);

        Map<String, String> dsConf = getAccumuloDataStoreConf(cmd);

 

        String featureName = cmd.getOptionValue(FEATURE_NAME); // e.g. "sensor"

        SimpleFeatureType featureType = buildFeatureType(featureName);

 

        DataStore ds = DataStoreFinder.getDataStore(dsConf);

        ds.createSchema(featureType);

 

        runMapReduceJob(featureName,

                                                "GeoMesa SensorEvent Ingest",

                                                SensorIngestMapper.class,

            dsConf,

            new Path(cmd.getOptionValue(INGEST_FILE)));

    }

 

    private static void runMapReduceJob(String featureName, String jobName, Class mapper,

                                        Map<String, String> dsConf,

                                        Path mapredCSVFilePath) throws Exception {

        Job job = Job.getInstance(new Configuration());

        job.setMapperClass(mapper);

        job.setOutputFormatClass(AccumuloFileOutputFormat.class);

        job.setJobName(jobName);

        Configuration conf = job.getConfiguration();

        for (String key : dsConf.keySet()) {

            conf.set(key, dsConf.get(key));

        }

        conf.set(FEATURE_NAME, featureName);

        FileSystem fs = FileSystem.get(conf);

        FileInputFormat.setInputPaths(job, mapredCSVFilePath);

        Path tmpPath = new Path("///tmp");

        if (!fs.exists(tmpPath)) {

            fs.create(tmpPath);

        }

        Path outputDir = new Path("///tmp", "geomesa-"+featureName+"-output");

        if (fs.exists(outputDir)) {

          // remove this directory, if it already exists

            fs.delete(outputDir, true);

        }

        Path hdfsJarPath = new Path("///tmp", "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");

        if (fs.exists(hdfsJarPath)) {

          // remove this jar, if it already exists

            fs.delete(hdfsJarPath, true);

        }

        FileOutputFormat.setOutputPath(job, outputDir);

        fs.copyFromLocalFile(new Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"), hdfsJarPath);

        for (FileStatus path : fs.listStatus(hdfsJarPath)) {

            job.addArchiveToClassPath(new Path(path.getPath().toUri().getPath()));

        }

 

        job.submit();

 

        if (!job.waitForCompletion(true)) {

            throw new Exception("Job failed");

        }

    }

 

 

    private static SimpleFeatureType buildFeatureType(String featureName) throws SchemaException {

        String name = featureName;

        String spec = Joiner.on(",").join(attributes);

        SimpleFeatureType featureType = DataUtilities.createType(name, spec);

        featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME, "Time"); // Tells GeoMesa to use "Time" as the Start Time index

        return featureType;

    }

 

    static List<String> attributes = Lists.newArrayList(

                                "EventID:Integer",

                                "Sensor:Integer",

                                "Time:Integer",                                // TODO: ISSUE I001 - actually a "Unix" time (e.g., millis since 1970.01.01)

                                "Latitude:Float",

                                "Longitude:Float",

                                "Altitude:Integer",

                                "MinAxis:Float",

                                "MajAxis:Float",

                                "Orientation:Float"

                );

}

 

// SensorIngestMapper.java ====================================================================================

 

package geomesa.sensor;

 

import com.vividsolutions.jts.geom.Coordinate;

import com.vividsolutions.jts.geom.Geometry;

import com.vividsolutions.jts.geom.GeometryFactory;

import org.apache.accumulo.core.data.Key;

import org.apache.accumulo.core.data.Value;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.geotools.data.DataStore;

import org.geotools.data.DataStoreFinder;

import org.geotools.data.FeatureWriter;

import org.geotools.data.Transaction;

import org.geotools.feature.simple.SimpleFeatureBuilder;

import org.geotools.filter.identity.FeatureIdImpl;

import org.geotools.geometry.jts.JTSFactoryFinder;

import org.opengis.feature.simple.SimpleFeature;

import org.opengis.feature.simple.SimpleFeatureType;

 

import java.io.IOException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.HashMap;

import java.util.Map;

 

public class SensorIngestMapper extends Mapper<LongWritable,Text,Key,Value> {

 

                // Columns: EventID, Sensor, Time, Latitude, Longitude, Altitude, MinAxis, MajAxis, Orientation

    private static int ID_COL_IDX = 0;

    private static int TIME_COL_IDX = 2;

    private static int LATITUDE_COL_IDX  = 3;

    private static int LONGITUDE_COL_IDX = 4;

    private static int MINIMUM_NUM_FIELDS = 9;

 

    private SimpleFeatureType      featureType = null;

    private FeatureWriter<SimpleFeatureType, SimpleFeature> featureWriter = null;

    private SimpleFeatureBuilder featureBuilder;

    private GeometryFactory          geometryFactory = JTSFactoryFinder.getGeometryFactory();

 

                private static void _(String s) {

                                System.out.println("("+s+")");

                }

 

    public void setup(Mapper<LongWritable,Text,Key,Value>.Context context) throws IOException, InterruptedException {

        super.setup(context);

                                _("SensorIngestMapper.setup: ENTER...");

 

        Map<String , String> connectionParams = new HashMap<String , String>();

        connectionParams.put("instanceId", context.getConfiguration().get("instanceId"));

        connectionParams.put("zookeepers", context.getConfiguration().get("zookeepers"));

        connectionParams.put("user", context.getConfiguration().get("user"));

        connectionParams.put("password", context.getConfiguration().get("password"));

        connectionParams.put("auths", context.getConfiguration().get("auths"));

        connectionParams.put("tableName", context.getConfiguration().get("tableName"));

 

        String featureName = context.getConfiguration().get("featureName");

                                _("SensorIngestMapper.setup: featureName="+featureName);

        DataStore ds = DataStoreFinder.getDataStore(connectionParams);

        featureType = ds.getSchema(featureName);

                                _("SensorIngestMapper.setup: featureType="+featureType.getTypeName());

        featureBuilder = new SimpleFeatureBuilder(featureType);

                                _("SensorIngestMapper.setup: featureBuilder OK...");

        featureWriter = ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT); // NullPointerException on this call!!!

                                _("SensorIngestMapper.setup: RETURN.");

    }

 

    public void map(LongWritable key, Text value, Mapper<LongWritable,Text,Key,Value>.Context context) {

                                _("SensorIngestMapper.map: ENTER...");

        String[] attributes = value.toString().split("\\t", -1);

        if (attributes.length >= MINIMUM_NUM_FIELDS && !attributes[LATITUDE_COL_IDX].equals("") && !attributes[LONGITUDE_COL_IDX].equals("")) {

            SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");

            try {

                featureBuilder.reset();

                featureBuilder.addAll(attributes);

 

                Double lat = Double.parseDouble(attributes[LATITUDE_COL_IDX]); // TODO: update for Lat/Lon offsets

                Double lon = Double.parseDouble(attributes[LONGITUDE_COL_IDX]);

 

                Geometry geom = geometryFactory.createPoint(new Coordinate(lon, lat));

                SimpleFeature simpleFeature = featureBuilder.buildFeature(attributes[ID_COL_IDX]);

                simpleFeature.setAttribute("Time", formatter.parse(attributes[TIME_COL_IDX]));

                simpleFeature.setDefaultGeometry(geom);

 

                try {

                    SimpleFeature next = featureWriter.next();

                    for (int i = 0; i < simpleFeature.getAttributeCount(); i++) {

                        next.setAttribute(i, simpleFeature.getAttribute(i));

                    }

                    ((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());

                    featureWriter.write();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            } catch (ParseException e) {

                e.printStackTrace();

            }

        }

                                _("SensorIngestMapper.map: RETURN ...");

    }

}

 

Bob Barnhart

Chief Systems Engineer | 858 826 5596 (Office) | 619 972 9489 (Mobile) | barnhartr@xxxxxxxxxx

 

 

 

 

Bob Barnhart

Chief Systems Engineer | 858 826 5596 (Office) | 619 972 9489 (Mobile) | Robert.M.Barnhart@xxxxxxxxxx

 

 






_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
http://www.locationtech.org/mailman/listinfo/geomesa-users

 

 

 



Back to the top