(My apologies if this is a re-post due to email-address issues)
I have experienced problems ingesting any of the sample GDELT data cited from the GeoMesa tutorial instructions on
http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.
Consequently, I tried to adapt the code from https://github.com/ccri/geomesa-gdelt to ingest a different data-set.
My data (in.CSV format—I did convert to .TSV format for ingestion) looks like:
EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation
1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966
2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236
3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794
4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117
5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797
6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299
7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217
8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836
9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621
10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848
. . .
(Note: the “Time” field is a Unix/Java-style time: i.e., # milliseconds since 1970.01.01).
I adapted the GDELTIngest class to SensorIngest.java as shown below.
I also adapted the GDELTIngestMapper class to SensorIngestMapper.java as shown below.
I essentially used the technique from the tutorial to upload my data-file to HDFS (after converting to .TSV format).
I then used basically the same ingestion command from the tutorial:
hadoop jar ./target/geomesa-sensor-1.0-SNAPSHOT.jar geomesa.sensor.SensorIngest -instanceId ntc-irad -zookeepers localhost:2181 -user root -password “…” -auths
public -tableName sensor -featureName sensor -ingestFile hdfs:///sensor/sensor.tsv
The ingestion begins, the “sensor” table is created in Accumulo, but after processing 4 “entries”, the ingestion fails with the following NullPointerException (in Hadoop’s MapReduce?):
ava.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
at geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)
at geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)
at geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)
at geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)
at geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)
at org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)
at org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)
at geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/05/14 13:44:37 INFO mapreduce.Job: Job job_local1747290386_0001 failed with state FAILED due to: NA
14/05/14 13:44:37 INFO mapreduce.Job: Counters: 0
Exception in thread "main" java.lang.Exception: Job failed
at geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)
at geomesa.sensor.SensorIngest.main(SensorIngest.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
The line tagged with “!!!” in SensorIngestMapper.java listing is where the code is dying (in the DataStore.getFeatureWriter call).
I’m probably missing something simple, but so far I’ve been unable to get this simple data-file to ingest into Accumulo using the GeoMesa/Hadoop mechanism (although I can ingest
data directly into Accumulo).
Also, I’m not sure what attribute-type to associate with the “Time” field in the “attributes” List. I’ve tried “:Time”, “:Timestamp”, “:Datetime”, etc. with no success, so I’ve
had to label it simply as “:Integer”.
Any help or suggestions would be greatly appreciated!
Regards,
Bob Barnhart (barnhartr@xxxxxxxxxx)
// SensorIngest.java ===========================================================================================
package geomesa.sensor;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import geomesa.core.index.Constants;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.DataUtilities;
import org.geotools.feature.SchemaException;
import org.opengis.feature.simple.SimpleFeatureType;
import java.util.*;
public class SensorIngest {
static String INSTANCE_ID = "instanceId";
static String ZOOKEEPERS = "zookeepers";
static String USER = "user";
static String PASSWORD = "password";
static String AUTHS = "auths";
static String TABLE_NAME = "tableName";
static String FEATURE_NAME = "featureName";
static String INGEST_FILE = "ingestFile";
static String[] ACCUMULO_CONNECTION_PARAMS = new String[]{INSTANCE_ID,
ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};
static Options getCommonRequiredOptions() {
Options options = new Options();
Option instanceIdOpt = OptionBuilder.withArgName(INSTANCE_ID)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter instanceId")
.create(INSTANCE_ID);
Option zookeepersOpt = OptionBuilder.withArgName(ZOOKEEPERS)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter zookeepers")
.create(ZOOKEEPERS);
Option userOpt = OptionBuilder.withArgName(USER)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter user")
.create(USER);
Option passwordOpt = OptionBuilder.withArgName(PASSWORD)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter password")
.create(PASSWORD);
Option authsOpt = OptionBuilder.withArgName(AUTHS)
.hasArg()
.withDescription("accumulo connection parameter auths")
.create(AUTHS);
Option tableNameOpt = OptionBuilder.withArgName(TABLE_NAME)
.hasArg()
.isRequired()
.withDescription("accumulo connection parameter tableName")
.create(TABLE_NAME);
Option featureNameOpt = OptionBuilder.withArgName(FEATURE_NAME)
.hasArg()
.isRequired()
.withDescription("name of feature in accumulo table")
.create(FEATURE_NAME);
options.addOption(instanceIdOpt);
options.addOption(zookeepersOpt);
options.addOption(userOpt);
options.addOption(passwordOpt);
options.addOption(authsOpt);
options.addOption(tableNameOpt);
options.addOption(featureNameOpt);
return options;
}
public static Map<String, String> getAccumuloDataStoreConf(CommandLine cmd) {
Map<String , String> dsConf = new HashMap<String , String>();
for (String param : ACCUMULO_CONNECTION_PARAMS) {
dsConf.put(param, cmd.getOptionValue(param));
}
if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS, "");
return dsConf;
}
public static void main(String [ ] args) throws Exception {
CommandLineParser parser = new BasicParser();
Options options = getCommonRequiredOptions();
Option ingestFileOpt = OptionBuilder.withArgName(INGEST_FILE)
.hasArg()
.isRequired()
.withDescription("ingest tsv file on hdfs")
.create(INGEST_FILE);
options.addOption(ingestFileOpt);
CommandLine cmd = parser.parse( options, args);
Map<String, String> dsConf = getAccumuloDataStoreConf(cmd);
String featureName = cmd.getOptionValue(FEATURE_NAME); // e.g. "sensor"
SimpleFeatureType featureType = buildFeatureType(featureName);
DataStore ds = DataStoreFinder.getDataStore(dsConf);
ds.createSchema(featureType);
runMapReduceJob(featureName,
"GeoMesa SensorEvent Ingest",
SensorIngestMapper.class,
dsConf,
new Path(cmd.getOptionValue(INGEST_FILE)));
}
private static void runMapReduceJob(String featureName, String jobName, Class mapper,
Map<String, String> dsConf,
Path mapredCSVFilePath) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setMapperClass(mapper);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
job.setJobName(jobName);
Configuration conf = job.getConfiguration();
for (String key : dsConf.keySet()) {
conf.set(key, dsConf.get(key));
}
conf.set(FEATURE_NAME, featureName);
FileSystem fs = FileSystem.get(conf);
FileInputFormat.setInputPaths(job, mapredCSVFilePath);
Path tmpPath = new Path("///tmp");
if (!fs.exists(tmpPath)) {
fs.create(tmpPath);
}
Path outputDir = new Path("///tmp", "geomesa-"+featureName+"-output");
if (fs.exists(outputDir)) {
// remove this directory, if it already exists
fs.delete(outputDir, true);
}
Path hdfsJarPath = new Path("///tmp", "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
if (fs.exists(hdfsJarPath)) {
// remove this jar, if it already exists
fs.delete(hdfsJarPath, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
fs.copyFromLocalFile(new Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"), hdfsJarPath);
for (FileStatus path : fs.listStatus(hdfsJarPath)) {
job.addArchiveToClassPath(new Path(path.getPath().toUri().getPath()));
}
job.submit();
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
private static SimpleFeatureType buildFeatureType(String featureName) throws SchemaException {
String name = featureName;
String spec = Joiner.on(",").join(attributes);
SimpleFeatureType featureType = DataUtilities.createType(name, spec);
featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME, "Time"); // Tells GeoMesa to use "Time" as the Start Time index
return featureType;
}
static List<String> attributes = Lists.newArrayList(
"EventID:Integer",
"Sensor:Integer",
"Time:Integer", // TODO: ISSUE I001 - actually a "Unix" time (e.g., millis since 1970.01.01)
"Latitude:Float",
"Longitude:Float",
"Altitude:Integer",
"MinAxis:Float",
"MajAxis:Float",
"Orientation:Float"
);
}
// SensorIngestMapper.java ====================================================================================
package geomesa.sensor;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.identity.FeatureIdImpl;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class SensorIngestMapper extends Mapper<LongWritable,Text,Key,Value> {
// Columns: EventID, Sensor, Time, Latitude, Longitude, Altitude, MinAxis, MajAxis, Orientation
private static int ID_COL_IDX = 0;
private static int TIME_COL_IDX = 2;
private static int LATITUDE_COL_IDX = 3;
private static int LONGITUDE_COL_IDX = 4;
private static int MINIMUM_NUM_FIELDS = 9;
private SimpleFeatureType featureType = null;
private FeatureWriter<SimpleFeatureType, SimpleFeature> featureWriter = null;
private SimpleFeatureBuilder featureBuilder;
private GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
private static void _(String s) {
System.out.println("("+s+")");
}
public void setup(Mapper<LongWritable,Text,Key,Value>.Context context) throws IOException, InterruptedException {
super.setup(context);
_("SensorIngestMapper.setup: ENTER...");
Map<String , String> connectionParams = new HashMap<String , String>();
connectionParams.put("instanceId", context.getConfiguration().get("instanceId"));
connectionParams.put("zookeepers", context.getConfiguration().get("zookeepers"));
connectionParams.put("user", context.getConfiguration().get("user"));
connectionParams.put("password", context.getConfiguration().get("password"));
connectionParams.put("auths", context.getConfiguration().get("auths"));
connectionParams.put("tableName", context.getConfiguration().get("tableName"));
String featureName = context.getConfiguration().get("featureName");
_("SensorIngestMapper.setup: featureName="+featureName);
DataStore ds = DataStoreFinder.getDataStore(connectionParams);
featureType = ds.getSchema(featureName);
_("SensorIngestMapper.setup: featureType="+featureType.getTypeName());
featureBuilder = new SimpleFeatureBuilder(featureType);
_("SensorIngestMapper.setup: featureBuilder OK...");
featureWriter = ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT); // NullPointerException on this call!!!
_("SensorIngestMapper.setup: RETURN.");
}
public void map(LongWritable key, Text value, Mapper<LongWritable,Text,Key,Value>.Context context) {
_("SensorIngestMapper.map: ENTER...");
String[] attributes = value.toString().split("\\t", -1);
if (attributes.length >= MINIMUM_NUM_FIELDS && !attributes[LATITUDE_COL_IDX].equals("") && !attributes[LONGITUDE_COL_IDX].equals("")) {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
try {
featureBuilder.reset();
featureBuilder.addAll(attributes);
Double lat = Double.parseDouble(attributes[LATITUDE_COL_IDX]); // TODO: update for Lat/Lon offsets
Double lon = Double.parseDouble(attributes[LONGITUDE_COL_IDX]);
Geometry geom = geometryFactory.createPoint(new Coordinate(lon, lat));
SimpleFeature simpleFeature = featureBuilder.buildFeature(attributes[ID_COL_IDX]);
simpleFeature.setAttribute("Time", formatter.parse(attributes[TIME_COL_IDX]));
simpleFeature.setDefaultGeometry(geom);
try {
SimpleFeature next = featureWriter.next();
for (int i = 0; i < simpleFeature.getAttributeCount(); i++) {
next.setAttribute(i, simpleFeature.getAttribute(i));
}
((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
featureWriter.write();
} catch (IOException e) {
e.printStackTrace();
}
} catch (ParseException e) {
e.printStackTrace();
}
}
_("SensorIngestMapper.map: RETURN ...");
}
}
Bob Barnhart
Chief Systems Engineer | 858 826 5596 (Office) | 619 972 9489 (Mobile) |
barnhartr@xxxxxxxxxx
Bob Barnhart
Chief Systems Engineer | 858 826 5596 (Office) | 619 972 9489 (Mobile) |
Robert.M.Barnhart@xxxxxxxxxx