(My
apologies if this is a re-post due to email-address
issues)
I
have experienced problems ingesting any of the sample
GDELT data cited from the GeoMesa tutorial
instructions on
http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.
Consequently,
I tried to adapt the code from https://github.com/ccri/geomesa-gdelt
to ingest a different data-set.
My
data (in.CSV format—I did convert to .TSV format for
ingestion) looks like:
EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation
1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966
2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236
3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794
4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117
5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797
6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299
7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217
8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836
9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621
10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848
.
. .
(Note:
the “Time” field is a Unix/Java-style time: i.e., #
milliseconds since 1970.01.01).
I
adapted the GDELTIngest class to SensorIngest.java as
shown below.
I
also adapted the GDELTIngestMapper class to
SensorIngestMapper.java as shown below.
I
essentially used the technique from the tutorial to
upload my data-file to HDFS (after converting to .TSV
format).
I
then used basically the same ingestion command from
the tutorial:
hadoop
jar ./target/geomesa-sensor-1.0-SNAPSHOT.jar
geomesa.sensor.SensorIngest -instanceId
ntc-irad -zookeepers localhost:2181 -user
root -password “…” -auths public -tableName
sensor -featureName sensor -ingestFile
hdfs:///sensor/sensor.tsv
The
ingestion begins, the “sensor” table is created in
Accumulo, but after processing 4 “entries”, the
ingestion fails with the following
NullPointerException (in Hadoop’s MapReduce?):
ava.lang.Exception:
java.lang.NullPointerException
at
org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused
by: java.lang.NullPointerException
at
geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)
at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)
at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)
at
geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)
at
geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)
at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)
at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)
at
geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)
at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/05/14
13:44:37 INFO mapreduce.Job: Job
job_local1747290386_0001 failed with state FAILED due
to: NA
14/05/14
13:44:37 INFO mapreduce.Job: Counters: 0
Exception
in thread "main" java.lang.Exception: Job failed
at
geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)
at
geomesa.sensor.SensorIngest.main(SensorIngest.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
The
line tagged with “!!!” in SensorIngestMapper.java
listing is where the code is dying (in the
DataStore.getFeatureWriter call).
I’m
probably missing something simple, but so far I’ve
been unable to get this simple data-file to ingest
into Accumulo using the GeoMesa/Hadoop mechanism
(although I can ingest data directly into Accumulo).
Also,
I’m not sure what attribute-type to associate with the
“Time” field in the “attributes” List. I’ve tried
“:Time”, “:Timestamp”, “:Datetime”, etc. with no
success, so I’ve had to label it simply as “:Integer”.
Any
help or suggestions would be greatly appreciated!
Regards,
Bob
Barnhart (barnhartr@xxxxxxxxxx)
//
SensorIngest.java
===========================================================================================
package
geomesa.sensor;
import
com.google.common.base.Joiner;
import
com.google.common.collect.Lists;
import
geomesa.core.index.Constants;
import
org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import
org.apache.commons.cli.*;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileStatus;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.geotools.data.DataStore;
import
org.geotools.data.DataStoreFinder;
import
org.geotools.data.DataUtilities;
import
org.geotools.feature.SchemaException;
import
org.opengis.feature.simple.SimpleFeatureType;
import
java.util.*;
public
class SensorIngest {
static String INSTANCE_ID = "instanceId";
static String ZOOKEEPERS = "zookeepers";
static String USER = "user";
static String PASSWORD = "password";
static String AUTHS = "auths";
static String TABLE_NAME = "tableName";
static String FEATURE_NAME = "featureName";
static String INGEST_FILE = "ingestFile";
static String[] ACCUMULO_CONNECTION_PARAMS = new
String[]{INSTANCE_ID,
ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};
static Options getCommonRequiredOptions() {
Options options = new Options();
Option instanceIdOpt =
OptionBuilder.withArgName(INSTANCE_ID)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter
instanceId")
.create(INSTANCE_ID);
Option zookeepersOpt =
OptionBuilder.withArgName(ZOOKEEPERS)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter
zookeepers")
.create(ZOOKEEPERS);
Option userOpt = OptionBuilder.withArgName(USER)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter user")
.create(USER);
Option passwordOpt =
OptionBuilder.withArgName(PASSWORD)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter
password")
.create(PASSWORD);
Option authsOpt = OptionBuilder.withArgName(AUTHS)
.hasArg()
.withDescription("accumulo connection parameter
auths")
.create(AUTHS);
Option tableNameOpt =
OptionBuilder.withArgName(TABLE_NAME)
.hasArg()
.isRequired()
.withDescription("accumulo connection
parameter tableName")
.create(TABLE_NAME);
Option featureNameOpt =
OptionBuilder.withArgName(FEATURE_NAME)
.hasArg()
.isRequired()
.withDescription("name of feature in accumulo table")
.create(FEATURE_NAME);
options.addOption(instanceIdOpt);
options.addOption(zookeepersOpt);
options.addOption(userOpt);
options.addOption(passwordOpt);
options.addOption(authsOpt);
options.addOption(tableNameOpt);
options.addOption(featureNameOpt);
return options;
}
public static Map<String, String>
getAccumuloDataStoreConf(CommandLine cmd) {
Map<String , String> dsConf = new
HashMap<String , String>();
for (String param : ACCUMULO_CONNECTION_PARAMS) {
dsConf.put(param, cmd.getOptionValue(param));
}
if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS, "");
return dsConf;
}
public static void main(String [ ] args) throws
Exception {
CommandLineParser parser = new BasicParser();
Options options = getCommonRequiredOptions();
Option ingestFileOpt =
OptionBuilder.withArgName(INGEST_FILE)
.hasArg()
.isRequired()
.withDescription("ingest tsv
file on hdfs")
.create(INGEST_FILE);
options.addOption(ingestFileOpt);
CommandLine cmd = parser.parse( options, args);
Map<String, String> dsConf =
getAccumuloDataStoreConf(cmd);
String featureName = cmd.getOptionValue(FEATURE_NAME);
// e.g. "sensor"
SimpleFeatureType featureType =
buildFeatureType(featureName);
DataStore ds = DataStoreFinder.getDataStore(dsConf);
ds.createSchema(featureType);
runMapReduceJob(featureName,
"GeoMesa SensorEvent Ingest",
SensorIngestMapper.class,
dsConf,
new Path(cmd.getOptionValue(INGEST_FILE)));
}
private static void runMapReduceJob(String
featureName, String jobName, Class mapper,
Map<String, String> dsConf,
Path mapredCSVFilePath) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setMapperClass(mapper);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
job.setJobName(jobName);
Configuration conf = job.getConfiguration();
for (String key : dsConf.keySet()) {
conf.set(key, dsConf.get(key));
}
conf.set(FEATURE_NAME, featureName);
FileSystem fs = FileSystem.get(conf);
FileInputFormat.setInputPaths(job, mapredCSVFilePath);
Path tmpPath = new Path("///tmp");
if (!fs.exists(tmpPath)) {
fs.create(tmpPath);
}
Path outputDir = new Path("///tmp",
"geomesa-"+featureName+"-output");
if
(fs.exists(outputDir)) {
// remove this directory, if it already exists
fs.delete(outputDir, true);
}
Path hdfsJarPath = new Path("///tmp",
"geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
if (fs.exists(hdfsJarPath)) {
// remove this jar, if it already exists
fs.delete(hdfsJarPath, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
fs.copyFromLocalFile(new
Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"),
hdfsJarPath);
for (FileStatus path : fs.listStatus(hdfsJarPath)) {
job.addArchiveToClassPath(new
Path(path.getPath().toUri().getPath()));
}
job.submit();
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
private static SimpleFeatureType
buildFeatureType(String featureName) throws
SchemaException {
String name = featureName;
String spec = Joiner.on(",").join(attributes);
SimpleFeatureType featureType =
DataUtilities.createType(name, spec);
featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME,
"Time"); // Tells GeoMesa to use "Time" as the Start
Time index
return featureType;
}
static List<String> attributes =
Lists.newArrayList(
"EventID:Integer",
"Sensor:Integer",
"Time:Integer", //
TODO: ISSUE I001 - actually a "Unix" time (e.g.,
millis since 1970.01.01)
"Latitude:Float",
"Longitude:Float",
"Altitude:Integer",
"MinAxis:Float",
"MajAxis:Float",
"Orientation:Float"
);
}
//
SensorIngestMapper.java
====================================================================================
package
geomesa.sensor;
import
com.vividsolutions.jts.geom.Coordinate;
import
com.vividsolutions.jts.geom.Geometry;
import
com.vividsolutions.jts.geom.GeometryFactory;
import
org.apache.accumulo.core.data.Key;
import
org.apache.accumulo.core.data.Value;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.geotools.data.DataStore;
import
org.geotools.data.DataStoreFinder;
import
org.geotools.data.FeatureWriter;
import
org.geotools.data.Transaction;
import
org.geotools.feature.simple.SimpleFeatureBuilder;
import
org.geotools.filter.identity.FeatureIdImpl;
import
org.geotools.geometry.jts.JTSFactoryFinder;
import
org.opengis.feature.simple.SimpleFeature;
import
org.opengis.feature.simple.SimpleFeatureType;
import
java.io.IOException;
import
java.text.ParseException;
import
java.text.SimpleDateFormat;
import
java.util.HashMap;
import
java.util.Map;
public
class SensorIngestMapper extends
Mapper<LongWritable,Text,Key,Value> {
// Columns: EventID, Sensor, Time, Latitude,
Longitude, Altitude, MinAxis, MajAxis, Orientation
private static int ID_COL_IDX = 0;
private static int TIME_COL_IDX = 2;
private static int LATITUDE_COL_IDX = 3;
private static int LONGITUDE_COL_IDX = 4;
private static int MINIMUM_NUM_FIELDS = 9;
private SimpleFeatureType featureType = null;
private FeatureWriter<SimpleFeatureType,
SimpleFeature> featureWriter = null;
private SimpleFeatureBuilder featureBuilder;
private GeometryFactory geometryFactory =
JTSFactoryFinder.getGeometryFactory();
private static void _(String s) {
System.out.println("("+s+")");
}
public void
setup(Mapper<LongWritable,Text,Key,Value>.Context
context) throws IOException, InterruptedException {
super.setup(context);
_("SensorIngestMapper.setup: ENTER...");
Map<String , String> connectionParams = new
HashMap<String , String>();
connectionParams.put("instanceId",
context.getConfiguration().get("instanceId"));
connectionParams.put("zookeepers",
context.getConfiguration().get("zookeepers"));
connectionParams.put("user",
context.getConfiguration().get("user"));
connectionParams.put("password",
context.getConfiguration().get("password"));
connectionParams.put("auths",
context.getConfiguration().get("auths"));
connectionParams.put("tableName",
context.getConfiguration().get("tableName"));
String featureName =
context.getConfiguration().get("featureName");
_("SensorIngestMapper.setup:
featureName="+featureName);
DataStore ds =
DataStoreFinder.getDataStore(connectionParams);
featureType = ds.getSchema(featureName);
_("SensorIngestMapper.setup:
featureType="+featureType.getTypeName());
featureBuilder = new
SimpleFeatureBuilder(featureType);
_("SensorIngestMapper.setup: featureBuilder OK...");
featureWriter = ds.getFeatureWriter(featureName,
Transaction.AUTO_COMMIT); // NullPointerException on
this call!!!
_("SensorIngestMapper.setup: RETURN.");
}
public void map(LongWritable key, Text value,
Mapper<LongWritable,Text,Key,Value>.Context
context) {
_("SensorIngestMapper.map: ENTER...");
String[] attributes = value.toString().split("\\t", -1);
if (attributes.length >= MINIMUM_NUM_FIELDS
&& !attributes[LATITUDE_COL_IDX].equals("")
&& !attributes[LONGITUDE_COL_IDX].equals(""))
{
SimpleDateFormat formatter = new
SimpleDateFormat("yyyyMMdd");
try {
featureBuilder.reset();
featureBuilder.addAll(attributes);
Double lat =
Double.parseDouble(attributes[LATITUDE_COL_IDX]); //
TODO: update for Lat/Lon offsets
Double lon =
Double.parseDouble(attributes[LONGITUDE_COL_IDX]);
Geometry geom = geometryFactory.createPoint(new
Coordinate(lon, lat));
SimpleFeature simpleFeature =
featureBuilder.buildFeature(attributes[ID_COL_IDX]);
simpleFeature.setAttribute("Time",
formatter.parse(attributes[TIME_COL_IDX]));
simpleFeature.setDefaultGeometry(geom);
try {
SimpleFeature next = featureWriter.next();
for (int i = 0; i <
simpleFeature.getAttributeCount(); i++) {
next.setAttribute(i, simpleFeature.getAttribute(i));
}
((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
featureWriter.write();
} catch (IOException e) {
e.printStackTrace();
}
} catch (ParseException e) {
e.printStackTrace();
}
}
_("SensorIngestMapper.map: RETURN ...");
}
}
Bob
Barnhart
Chief
Systems Engineer | 858 826 5596 (Office) | 619 972
9489 (Mobile) |
barnhartr@xxxxxxxxxx
Bob
Barnhart
Chief
Systems Engineer | 858 826 5596 (Office) | 619 972
9489 (Mobile) |
Robert.M.Barnhart@xxxxxxxxxx