(My apologies if this is a re-post due to
email-address issues)
I have experienced problems ingesting any
of the sample GDELT data cited from the GeoMesa tutorial
instructions on
http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.
Consequently, I tried to adapt the code
from https://github.com/ccri/geomesa-gdelt
to ingest a different data-set.
My data (in.CSV format—I did convert to
.TSV format for ingestion) looks like:
EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation
1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966
2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236
3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794
4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117
5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797
6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299
7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217
8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836
9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621
10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848
. . .
(Note: the “Time” field is a
Unix/Java-style time: i.e., # milliseconds since 1970.01.01).
I adapted the GDELTIngest class to
SensorIngest.java as shown below.
I also adapted the GDELTIngestMapper class
to SensorIngestMapper.java as shown below.
I essentially used the technique from the
tutorial to upload my data-file to HDFS (after converting to
.TSV format).
I then used basically the same ingestion
command from the tutorial:
hadoop jar
./target/geomesa-sensor-1.0-SNAPSHOT.jar
geomesa.sensor.SensorIngest -instanceId ntc-irad
-zookeepers localhost:2181 -user root -password “…”
-auths public -tableName sensor -featureName
sensor -ingestFile hdfs:///sensor/sensor.tsv
The ingestion begins, the “sensor” table is
created in Accumulo, but after processing 4 “entries”, the
ingestion fails with the following NullPointerException (in
Hadoop’s MapReduce?):
ava.lang.Exception:
java.lang.NullPointerException
at
org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
at
geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)
at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)
at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)
at
geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)
at
geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)
at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)
at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)
at
geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)
at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at
java.lang.Thread.run(Thread.java:744)
14/05/14 13:44:37 INFO mapreduce.Job: Job
job_local1747290386_0001 failed with state FAILED due to: NA
14/05/14 13:44:37 INFO mapreduce.Job:
Counters: 0
Exception in thread "main"
java.lang.Exception: Job failed
at
geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)
at
geomesa.sensor.SensorIngest.main(SensorIngest.java:112)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.util.RunJar.main(RunJar.java:212)
The line tagged with “!!!” in
SensorIngestMapper.java listing is where the code is dying (in
the DataStore.getFeatureWriter call).
I’m probably missing something simple, but
so far I’ve been unable to get this simple data-file to ingest
into Accumulo using the GeoMesa/Hadoop mechanism (although I
can ingest data directly into Accumulo).
Also, I’m not sure what attribute-type to
associate with the “Time” field in the “attributes” List. I’ve
tried “:Time”, “:Timestamp”, “:Datetime”, etc. with no
success, so I’ve had to label it simply as “:Integer”.
Any help or suggestions would be greatly
appreciated!
Regards,
Bob Barnhart (barnhartr@xxxxxxxxxx)
// SensorIngest.java
===========================================================================================
package geomesa.sensor;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import geomesa.core.index.Constants;
import
org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.commons.cli.*;
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.DataUtilities;
import
org.geotools.feature.SchemaException;
import
org.opengis.feature.simple.SimpleFeatureType;
import java.util.*;
public class SensorIngest {
static String INSTANCE_ID =
"instanceId";
static String ZOOKEEPERS =
"zookeepers";
static String USER = "user";
static String PASSWORD = "password";
static String AUTHS = "auths";
static String TABLE_NAME = "tableName";
static String FEATURE_NAME =
"featureName";
static String INGEST_FILE =
"ingestFile";
static String[]
ACCUMULO_CONNECTION_PARAMS = new String[]{INSTANCE_ID,
ZOOKEEPERS, USER, PASSWORD,
AUTHS, TABLE_NAME};
static Options
getCommonRequiredOptions() {
Options options = new Options();
Option instanceIdOpt =
OptionBuilder.withArgName(INSTANCE_ID)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter instanceId")
.create(INSTANCE_ID);
Option zookeepersOpt =
OptionBuilder.withArgName(ZOOKEEPERS)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter zookeepers")
.create(ZOOKEEPERS);
Option userOpt =
OptionBuilder.withArgName(USER)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter user")
.create(USER);
Option passwordOpt =
OptionBuilder.withArgName(PASSWORD)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter password")
.create(PASSWORD);
Option authsOpt =
OptionBuilder.withArgName(AUTHS)
.hasArg()
.withDescription("accumulo connection parameter auths")
.create(AUTHS);
Option tableNameOpt =
OptionBuilder.withArgName(TABLE_NAME)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter
tableName")
.create(TABLE_NAME);
Option featureNameOpt =
OptionBuilder.withArgName(FEATURE_NAME)
.hasArg()
.isRequired()
.withDescription("name of feature in accumulo table")
.create(FEATURE_NAME);
options.addOption(instanceIdOpt);
options.addOption(zookeepersOpt);
options.addOption(userOpt);
options.addOption(passwordOpt);
options.addOption(authsOpt);
options.addOption(tableNameOpt);
options.addOption(featureNameOpt);
return options;
}
public static Map<String, String>
getAccumuloDataStoreConf(CommandLine cmd) {
Map<String , String> dsConf =
new HashMap<String , String>();
for (String param :
ACCUMULO_CONNECTION_PARAMS) {
dsConf.put(param,
cmd.getOptionValue(param));
}
if (dsConf.get(AUTHS) == null)
dsConf.put(AUTHS, "");
return dsConf;
}
public static void main(String [ ]
args) throws Exception {
CommandLineParser parser = new
BasicParser();
Options options =
getCommonRequiredOptions();
Option ingestFileOpt =
OptionBuilder.withArgName(INGEST_FILE)
.hasArg()
.isRequired()
.withDescription("ingest tsv file on
hdfs")
.create(INGEST_FILE);
options.addOption(ingestFileOpt);
CommandLine cmd = parser.parse(
options, args);
Map<String, String> dsConf =
getAccumuloDataStoreConf(cmd);
String featureName =
cmd.getOptionValue(FEATURE_NAME); // e.g. "sensor"
SimpleFeatureType featureType =
buildFeatureType(featureName);
DataStore ds =
DataStoreFinder.getDataStore(dsConf);
ds.createSchema(featureType);
runMapReduceJob(featureName,
"GeoMesa SensorEvent Ingest",
SensorIngestMapper.class,
dsConf,
new
Path(cmd.getOptionValue(INGEST_FILE)));
}
private static void
runMapReduceJob(String featureName, String jobName, Class
mapper,
Map<String, String> dsConf,
Path mapredCSVFilePath) throws Exception {
Job job = Job.getInstance(new
Configuration());
job.setMapperClass(mapper);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
job.setJobName(jobName);
Configuration conf =
job.getConfiguration();
for (String key : dsConf.keySet())
{
conf.set(key, dsConf.get(key));
}
conf.set(FEATURE_NAME,
featureName);
FileSystem fs =
FileSystem.get(conf);
FileInputFormat.setInputPaths(job,
mapredCSVFilePath);
Path tmpPath = new Path("///tmp");
if (!fs.exists(tmpPath)) {
fs.create(tmpPath);
}
Path outputDir = new Path("///tmp",
"geomesa-"+featureName+"-output");
if (fs.exists(outputDir)) {
// remove this directory, if it
already exists
fs.delete(outputDir, true);
}
Path hdfsJarPath = new
Path("///tmp", "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
if (fs.exists(hdfsJarPath)) {
// remove this jar, if it already
exists
fs.delete(hdfsJarPath, true);
}
FileOutputFormat.setOutputPath(job,
outputDir);
fs.copyFromLocalFile(new
Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"),
hdfsJarPath);
for (FileStatus path :
fs.listStatus(hdfsJarPath)) {
job.addArchiveToClassPath(new
Path(path.getPath().toUri().getPath()));
}
job.submit();
if (!job.waitForCompletion(true)) {
throw new Exception("Job
failed");
}
}
private static SimpleFeatureType
buildFeatureType(String featureName) throws SchemaException {
String name = featureName;
String spec =
Joiner.on(",").join(attributes);
SimpleFeatureType featureType =
DataUtilities.createType(name, spec);
featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME,
"Time"); // Tells GeoMesa to use "Time" as the Start Time
index
return featureType;
}
static List<String> attributes =
Lists.newArrayList(
"EventID:Integer",
"Sensor:Integer",
"Time:Integer", // TODO: ISSUE
I001 - actually a "Unix" time (e.g., millis since 1970.01.01)
"Latitude:Float",
"Longitude:Float",
"Altitude:Integer",
"MinAxis:Float",
"MajAxis:Float",
"Orientation:Float"
);
}
// SensorIngestMapper.java
====================================================================================
package geomesa.sensor;
import
com.vividsolutions.jts.geom.Coordinate;
import
com.vividsolutions.jts.geom.Geometry;
import
com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import
org.geotools.feature.simple.SimpleFeatureBuilder;
import
org.geotools.filter.identity.FeatureIdImpl;
import
org.geotools.geometry.jts.JTSFactoryFinder;
import
org.opengis.feature.simple.SimpleFeature;
import
org.opengis.feature.simple.SimpleFeatureType;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class SensorIngestMapper extends
Mapper<LongWritable,Text,Key,Value> {
// Columns: EventID,
Sensor, Time, Latitude, Longitude, Altitude, MinAxis, MajAxis,
Orientation
private static int ID_COL_IDX = 0;
private static int TIME_COL_IDX = 2;
private static int LATITUDE_COL_IDX =
3;
private static int LONGITUDE_COL_IDX =
4;
private static int MINIMUM_NUM_FIELDS =
9;
private SimpleFeatureType
featureType = null;
private
FeatureWriter<SimpleFeatureType, SimpleFeature>
featureWriter = null;
private SimpleFeatureBuilder
featureBuilder;
private GeometryFactory
geometryFactory = JTSFactoryFinder.getGeometryFactory();
private static void
_(String s) {
System.out.println("("+s+")");
}
public void
setup(Mapper<LongWritable,Text,Key,Value>.Context
context) throws IOException, InterruptedException {
super.setup(context);
_("SensorIngestMapper.setup: ENTER...");
Map<String , String>
connectionParams = new HashMap<String , String>();
connectionParams.put("instanceId",
context.getConfiguration().get("instanceId"));
connectionParams.put("zookeepers",
context.getConfiguration().get("zookeepers"));
connectionParams.put("user",
context.getConfiguration().get("user"));
connectionParams.put("password",
context.getConfiguration().get("password"));
connectionParams.put("auths",
context.getConfiguration().get("auths"));
connectionParams.put("tableName",
context.getConfiguration().get("tableName"));
String featureName =
context.getConfiguration().get("featureName");
_("SensorIngestMapper.setup: featureName="+featureName);
DataStore ds =
DataStoreFinder.getDataStore(connectionParams);
featureType =
ds.getSchema(featureName);
_("SensorIngestMapper.setup:
featureType="+featureType.getTypeName());
featureBuilder = new
SimpleFeatureBuilder(featureType);
_("SensorIngestMapper.setup: featureBuilder OK...");
featureWriter =
ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT); //
NullPointerException on this call!!!
_("SensorIngestMapper.setup: RETURN.");
}
public void map(LongWritable key, Text
value, Mapper<LongWritable,Text,Key,Value>.Context
context) {
_("SensorIngestMapper.map: ENTER...");
String[] attributes =
value.toString().split("\\t",
-1);
if (attributes.length >=
MINIMUM_NUM_FIELDS &&
!attributes[LATITUDE_COL_IDX].equals("") &&
!attributes[LONGITUDE_COL_IDX].equals("")) {
SimpleDateFormat formatter =
new SimpleDateFormat("yyyyMMdd");
try {
featureBuilder.reset();
featureBuilder.addAll(attributes);
Double lat =
Double.parseDouble(attributes[LATITUDE_COL_IDX]); // TODO:
update for Lat/Lon offsets
Double lon =
Double.parseDouble(attributes[LONGITUDE_COL_IDX]);
Geometry geom =
geometryFactory.createPoint(new Coordinate(lon, lat));
SimpleFeature simpleFeature
= featureBuilder.buildFeature(attributes[ID_COL_IDX]);
simpleFeature.setAttribute("Time",
formatter.parse(attributes[TIME_COL_IDX]));
simpleFeature.setDefaultGeometry(geom);
try {
SimpleFeature next =
featureWriter.next();
for (int i = 0; i <
simpleFeature.getAttributeCount(); i++) {
next.setAttribute(i, simpleFeature.getAttribute(i));
}
((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
featureWriter.write();
} catch (IOException e) {
e.printStackTrace();
}
} catch (ParseException e) {
e.printStackTrace();
}
}
_("SensorIngestMapper.map: RETURN ...");
}
}
Bob Barnhart
Chief Systems Engineer | 858 826 5596
(Office) | 619 972 9489 (Mobile) |
barnhartr@xxxxxxxxxx
Bob Barnhart
Chief Systems Engineer | 858 826 5596
(Office) | 619 972 9489 (Mobile) |
Robert.M.Barnhart@xxxxxxxxxx