(My
                      apologies if this is a re-post due to
                      email-address issues)
                     
                    I
                      have experienced problems ingesting any of the
                      sample GDELT data cited from the GeoMesa tutorial
                      instructions on
                      http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.
                     
                    Consequently,
                      I tried to adapt the code from  https://github.com/ccri/geomesa-gdelt
                      to ingest a different data-set.
                     
                    My
                      data (in.CSV format—I did convert to .TSV format
                      for ingestion) looks like:
                     
                    EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation
                    1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966
                    2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236
                    3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794
                    4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117
                    5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797
                    6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299
                    7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217
                    8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836
                    9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621
                    10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848
                    .
                      . .
                     
                    (Note:
                      the “Time” field is a Unix/Java-style time: i.e.,
                      # milliseconds since 1970.01.01).
                     
                    I
                      adapted the GDELTIngest class to SensorIngest.java
                      as shown below.
                     
                    I
                      also adapted the GDELTIngestMapper class to
                      SensorIngestMapper.java as shown below.
                     
                    I
                      essentially used the technique from the tutorial
                      to upload my data-file to HDFS (after converting
                      to .TSV format).
                     
                    I
                      then used basically the same ingestion command
                      from the tutorial:
                     
                    hadoop
                      jar ./target/geomesa-sensor-1.0-SNAPSHOT.jar 
                       geomesa.sensor.SensorIngest     -instanceId
                      ntc-irad        -zookeepers localhost:2181     
                      -user root -password “…”     -auths public      
                      -tableName sensor       -featureName sensor    
                      -ingestFile hdfs:///sensor/sensor.tsv
                     
                    The
                      ingestion begins, the “sensor” table is created in
                      Accumulo, but after processing 4 “entries”, the
                      ingestion fails with the following
                      NullPointerException (in Hadoop’s MapReduce?):
                     
                    ava.lang.Exception:
                      java.lang.NullPointerException
                       
                      at
org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
                       
                      at
                      org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
                    Caused
                      by: java.lang.NullPointerException
                       
                      at
geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)
                       
                      at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)
                       
                      at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)
                       
                      at
geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)
                       
                      at
geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)
                       
                      at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)
                       
                      at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)
                       
                      at
                      geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)
                       
                      at
                      org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
                       
                      at
                      org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
                       
                      at
                      org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
                       
                      at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
                       
                      at
                      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
                       
                      at
                      java.util.concurrent.FutureTask.run(FutureTask.java:262)
                       
                      at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                       
                      at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                       
                      at java.lang.Thread.run(Thread.java:744)
                    14/05/14
                      13:44:37 INFO mapreduce.Job: Job
                      job_local1747290386_0001 failed with state FAILED
                      due to: NA
                    14/05/14
                      13:44:37 INFO mapreduce.Job: Counters: 0
                    Exception
                      in thread "main" java.lang.Exception: Job failed
                       
                      at
                      geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)
                       
                      at
                      geomesa.sensor.SensorIngest.main(SensorIngest.java:112)
                       
                      at
                      sun.reflect.NativeMethodAccessorImpl.invoke0(Native
                      Method)
                       
                      at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                       
                      at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                       
                      at
                      java.lang.reflect.Method.invoke(Method.java:606)
                       
                      at
                      org.apache.hadoop.util.RunJar.main(RunJar.java:212)
                     
                    The
                      line tagged with “!!!” in SensorIngestMapper.java
                      listing is where the code is dying (in the
                      DataStore.getFeatureWriter call).
                     
                    I’m
                      probably missing something simple, but so far I’ve
                      been unable to get this simple data-file to ingest
                      into Accumulo using the GeoMesa/Hadoop mechanism
                      (although I can ingest data directly into
                      Accumulo).
                     
                    Also,
                      I’m not sure what attribute-type to associate with
                      the “Time” field in the “attributes” List. I’ve
                      tried “:Time”, “:Timestamp”, “:Datetime”, etc.
                      with no success, so I’ve had to label it simply as
                      “:Integer”.
                     
                    Any
                      help or suggestions would be greatly appreciated!
                     
                    Regards,
                    Bob
                      Barnhart (barnhartr@xxxxxxxxxx)
                     
                    //
                      SensorIngest.java
===========================================================================================
                     
                    package
                      geomesa.sensor;
                     
                    import
                      com.google.common.base.Joiner;
                    import
                      com.google.common.collect.Lists;
                    import
                      geomesa.core.index.Constants;
                    import
org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
                    import
                      org.apache.commons.cli.*;
                    import
                      org.apache.hadoop.conf.Configuration;
                    import
                      org.apache.hadoop.fs.FileStatus;
                    import
                      org.apache.hadoop.fs.FileSystem;
                    import
                      org.apache.hadoop.fs.Path;
                    import
                      org.apache.hadoop.mapreduce.Job;
                    import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
                    import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
                    import
                      org.geotools.data.DataStore;
                    import
                      org.geotools.data.DataStoreFinder;
                    import
                      org.geotools.data.DataUtilities;
                    import
                      org.geotools.feature.SchemaException;
                    import
                      org.opengis.feature.simple.SimpleFeatureType;
                     
                    import
                      java.util.*;
                     
                    public
                      class SensorIngest {
                     
                       
                      static String INSTANCE_ID = "instanceId";
                       
                      static String ZOOKEEPERS = "zookeepers";
                       
                      static String USER = "user";
                       
                      static String PASSWORD = "password";
                       
                      static String AUTHS = "auths";
                       
                      static String TABLE_NAME = "tableName";
                       
                      static String FEATURE_NAME = "featureName";
                       
                      static String INGEST_FILE = "ingestFile";
                       
                      static String[] ACCUMULO_CONNECTION_PARAMS = new
                      String[]{INSTANCE_ID,
                               
                      ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};
                     
                       
                      static Options getCommonRequiredOptions() {
                           
                      Options options = new Options();
                           
                      Option instanceIdOpt =
                      OptionBuilder.withArgName(INSTANCE_ID)
                                                            
                      .hasArg()
                                                            
                      .isRequired()
                                                            
                      .withDescription("accumulo connection parameter
                      instanceId")
                                                     
                             .create(INSTANCE_ID);
                           
                      Option zookeepersOpt =
                      OptionBuilder.withArgName(ZOOKEEPERS)
                                                            
                      .hasArg()
                                                            
                      .isRequired()
                                                            
                      .withDescription("accumulo connection parameter
                      zookeepers")
                                                            
                      .create(ZOOKEEPERS);
                           
                      Option userOpt = OptionBuilder.withArgName(USER)
                                                      
                            .hasArg()
                                                            
                      .isRequired()
                                                            
                      .withDescription("accumulo connection parameter
                      user")
                                                            
                      .create(USER);
                           
                      Option passwordOpt =
                      OptionBuilder.withArgName(PASSWORD)
                                                            
                      .hasArg()
                                                            
                      .isRequired()
                                                            
                      .withDescription("accumulo connection parameter
                      password")
                                     
                                             .create(PASSWORD);
                           
                      Option authsOpt = OptionBuilder.withArgName(AUTHS)
                                                            
                      .hasArg()
                                                            
                      .withDescription("accumulo connection parameter
                      auths")
                          
                                                        .create(AUTHS);
                           
                      Option tableNameOpt =
                      OptionBuilder.withArgName(TABLE_NAME)
                                                            
                      .hasArg()
                                                            
                      .isRequired()
                                                   
                               .withDescription("accumulo connection
                      parameter tableName")
                                                            
                      .create(TABLE_NAME);
                           
                      Option featureNameOpt =
                      OptionBuilder.withArgName(FEATURE_NAME)
                                                            
                      .hasArg()
                     
                                       .isRequired()
                                                            
                      .withDescription("name of feature in accumulo
                      table")
                                                            
                      .create(FEATURE_NAME);
                           
                      options.addOption(instanceIdOpt);
                     
                            options.addOption(zookeepersOpt);
                           
                      options.addOption(userOpt);
                           
                      options.addOption(passwordOpt);
                           
                      options.addOption(authsOpt);
                           
                      options.addOption(tableNameOpt);
                           
                      options.addOption(featureNameOpt);
                           
                      return options;
                       
                      }
                     
                       
                      public static Map<String, String>
                      getAccumuloDataStoreConf(CommandLine cmd) {
                           
                      Map<String , String> dsConf = new
                      HashMap<String , String>();
                           
                      for (String param : ACCUMULO_CONNECTION_PARAMS) {
                               
                      dsConf.put(param, cmd.getOptionValue(param));
                           
                      }
                           
                      if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS,
                      "");
                           
                      return dsConf;
                       
                      }
                     
                       
                      public static void main(String [ ] args) throws
                      Exception {
                           
                      CommandLineParser parser = new BasicParser();
                           
                      Options options = getCommonRequiredOptions();
                           
                      Option ingestFileOpt =
                      OptionBuilder.withArgName(INGEST_FILE)
                                                            
                      .hasArg()
                                                            
                      .isRequired()
                                      
                                            .withDescription("ingest tsv
                      file on hdfs")
                                                            
                      .create(INGEST_FILE);
                           
                      options.addOption(ingestFileOpt);
                     
                           
                      CommandLine cmd = parser.parse( options, args);
                           
                      Map<String, String> dsConf =
                      getAccumuloDataStoreConf(cmd);
                     
                           
                      String featureName =
                      cmd.getOptionValue(FEATURE_NAME); // e.g. "sensor"
                           
                      SimpleFeatureType featureType =
                      buildFeatureType(featureName);
                     
                           
                      DataStore ds =
                      DataStoreFinder.getDataStore(dsConf);
                           
                      ds.createSchema(featureType);
                     
                           
                      runMapReduceJob(featureName,
                      
                                                                   
                      "GeoMesa SensorEvent Ingest",
                      
                                                                   
                      SensorIngestMapper.class,
                               
                      dsConf,
                               
                      new Path(cmd.getOptionValue(INGEST_FILE)));
                       
                      }
                     
                       
                      private static void runMapReduceJob(String
                      featureName, String jobName, Class mapper,
                                                           
                      Map<String, String> dsConf,
                                                           
                      Path mapredCSVFilePath) throws Exception {
                           
                      Job job = Job.getInstance(new Configuration());
                           
                      job.setMapperClass(mapper);
                           
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
                           
                      job.setJobName(jobName);
                           
                      Configuration conf = job.getConfiguration();
                           
                      for (String key : dsConf.keySet()) {
                               
                      conf.set(key, dsConf.get(key));
                           
                      }
                           
                      conf.set(FEATURE_NAME, featureName);
                           
                      FileSystem fs = FileSystem.get(conf);
                           
                      FileInputFormat.setInputPaths(job,
                      mapredCSVFilePath);
                           
                      Path tmpPath = new Path("///tmp");
                           
                      if (!fs.exists(tmpPath)) {
                               
                      fs.create(tmpPath);
                           
                      }
                           
                      Path outputDir = new Path("///tmp",
                      "geomesa-"+featureName+"-output");
                      
                            if
                      (fs.exists(outputDir)) {
                             
                      // remove this directory, if it already exists
                         
                            fs.delete(outputDir, true);
                           
                      }
                           
                      Path hdfsJarPath = new Path("///tmp",
                      "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
                           
                      if (fs.exists(hdfsJarPath)) {
                             
                      // remove this jar, if it already exists
                               
                      fs.delete(hdfsJarPath, true);
                           
                      }
                           
                      FileOutputFormat.setOutputPath(job, outputDir);
                           
                      fs.copyFromLocalFile(new
                      Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"),
                      hdfsJarPath);
                           
                      for (FileStatus path : fs.listStatus(hdfsJarPath))
                      {
                               
                      job.addArchiveToClassPath(new
                      Path(path.getPath().toUri().getPath()));
                           
                      }
                     
                           
                      job.submit();
                     
                           
                      if (!job.waitForCompletion(true)) {
                               
                      throw new Exception("Job failed");
                           
                      }
                       
                      }
                     
                     
                       
                      private static SimpleFeatureType
                      buildFeatureType(String featureName) throws
                      SchemaException {
                           
                      String name = featureName;
                           
                      String spec = Joiner.on(",").join(attributes);
                           
                      SimpleFeatureType featureType =
                      DataUtilities.createType(name, spec);
                           
                      featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME,
                      "Time"); // Tells GeoMesa to use "Time" as the
                      Start Time index
                           
                      return featureType;
                       
                      }
                     
                       
                      static List<String> attributes =
                      Lists.newArrayList(
                                                   
                      "EventID:Integer",
                                                   
                      "Sensor:Integer",
                                                   
                      "Time:Integer",                                //
                      TODO: ISSUE I001 - actually a "Unix" time (e.g.,
                      millis since 1970.01.01)
                                                   
                      "Latitude:Float",
                                                   
                      "Longitude:Float",
                                                   
                      "Altitude:Integer",
                                                   
                      "MinAxis:Float",
                                                   
                      "MajAxis:Float",
                                                   
                      "Orientation:Float"
                                   
                      );
                    }
                     
                    //
                      SensorIngestMapper.java
====================================================================================
                     
                    package
                      geomesa.sensor;
                     
                    import
                      com.vividsolutions.jts.geom.Coordinate;
                    import
                      com.vividsolutions.jts.geom.Geometry;
                    import
                      com.vividsolutions.jts.geom.GeometryFactory;
                    import
                      org.apache.accumulo.core.data.Key;
                    import
                      org.apache.accumulo.core.data.Value;
                    import
                      org.apache.hadoop.io.LongWritable;
                    import
                      org.apache.hadoop.io.Text;
                    import
                      org.apache.hadoop.mapreduce.Mapper;
                    import
                      org.geotools.data.DataStore;
                    import
                      org.geotools.data.DataStoreFinder;
                    import
                      org.geotools.data.FeatureWriter;
                    import
                      org.geotools.data.Transaction;
                    import
                      org.geotools.feature.simple.SimpleFeatureBuilder;
                    import
                      org.geotools.filter.identity.FeatureIdImpl;
                    import
                      org.geotools.geometry.jts.JTSFactoryFinder;
                    import
                      org.opengis.feature.simple.SimpleFeature;
                    import
                      org.opengis.feature.simple.SimpleFeatureType;
                     
                    import
                      java.io.IOException;
                    import
                      java.text.ParseException;
                    import
                      java.text.SimpleDateFormat;
                    import
                      java.util.HashMap;
                    import
                      java.util.Map;
                     
                    public
                      class SensorIngestMapper extends
                      Mapper<LongWritable,Text,Key,Value> {
                     
                                   
                      // Columns: EventID, Sensor, Time, Latitude,
                      Longitude, Altitude, MinAxis, MajAxis, Orientation
                       
                      private static int ID_COL_IDX = 0;
                       
                      private static int TIME_COL_IDX = 2;
                       
                      private static int LATITUDE_COL_IDX  = 3;
                       
                      private static int LONGITUDE_COL_IDX = 4;
                       
                      private static int MINIMUM_NUM_FIELDS = 9;
                     
                       
                      private SimpleFeatureType      featureType = null;
                       
                      private FeatureWriter<SimpleFeatureType,
                      SimpleFeature> featureWriter = null;
                       
                      private SimpleFeatureBuilder featureBuilder;
                       
                      private GeometryFactory          geometryFactory =
                      JTSFactoryFinder.getGeometryFactory();
                     
                                   
                      private static void _(String s) {
                                                   
                      System.out.println("("+s+")");
                                   
                      }
                     
                       
                      public void
                      setup(Mapper<LongWritable,Text,Key,Value>.Context
                      context) throws IOException, InterruptedException
                      {
                           
                      super.setup(context);
                                                   
                      _("SensorIngestMapper.setup: ENTER...");
                     
                           
                      Map<String , String> connectionParams = new
                      HashMap<String , String>();
                           
                      connectionParams.put("instanceId",
                      context.getConfiguration().get("instanceId"));
                           
                      connectionParams.put("zookeepers",
                      context.getConfiguration().get("zookeepers"));
                           
                      connectionParams.put("user",
                      context.getConfiguration().get("user"));
                           
                      connectionParams.put("password",
                      context.getConfiguration().get("password"));
                           
                      connectionParams.put("auths",
                      context.getConfiguration().get("auths"));
                           
                      connectionParams.put("tableName",
                      context.getConfiguration().get("tableName"));
                     
                           
                      String featureName =
                      context.getConfiguration().get("featureName");
                                                   
                      _("SensorIngestMapper.setup:
                      featureName="+featureName);
                           
                      DataStore ds =
                      DataStoreFinder.getDataStore(connectionParams);
                           
                      featureType = ds.getSchema(featureName);
                                                   
                      _("SensorIngestMapper.setup:
                      featureType="+featureType.getTypeName());
                           
                      featureBuilder = new
                      SimpleFeatureBuilder(featureType);
                                                   
                      _("SensorIngestMapper.setup: featureBuilder
                      OK...");
                           
                      featureWriter = ds.getFeatureWriter(featureName,
                      Transaction.AUTO_COMMIT); // NullPointerException
                      on this call!!!
                                                   
                      _("SensorIngestMapper.setup: RETURN.");
                       
                      }
                     
                       
                      public void map(LongWritable key, Text value,
                      Mapper<LongWritable,Text,Key,Value>.Context
                      context) {
                                                   
                      _("SensorIngestMapper.map: ENTER...");
                           
                      String[] attributes = value.toString().split("\\t",
                      -1);
                           
                      if (attributes.length >= MINIMUM_NUM_FIELDS
                      &&
                      !attributes[LATITUDE_COL_IDX].equals("")
                      &&
                      !attributes[LONGITUDE_COL_IDX].equals("")) {
                               
                      SimpleDateFormat formatter = new
                      SimpleDateFormat("yyyyMMdd");
                               
                      try {
                                   
                      featureBuilder.reset();
                                   
                      featureBuilder.addAll(attributes);
                     
                                   
                      Double lat =
                      Double.parseDouble(attributes[LATITUDE_COL_IDX]);
                      // TODO: update for Lat/Lon offsets
                      
                                   Double lon =
                      Double.parseDouble(attributes[LONGITUDE_COL_IDX]);
                     
                                   
                      Geometry geom = geometryFactory.createPoint(new
                      Coordinate(lon, lat));
                                   
                      SimpleFeature simpleFeature =
                      featureBuilder.buildFeature(attributes[ID_COL_IDX]);
                                   
                      simpleFeature.setAttribute("Time",
                      formatter.parse(attributes[TIME_COL_IDX]));
                                   
                      simpleFeature.setDefaultGeometry(geom);
                     
                                   
                      try {
                                       
                      SimpleFeature next = featureWriter.next();
                              
                               for (int i = 0; i <
                      simpleFeature.getAttributeCount(); i++) {
                                           
                      next.setAttribute(i,
                      simpleFeature.getAttribute(i));
                                       
                      }
                                       
((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
                                       
                      featureWriter.write();
                                   
                      } catch (IOException e) {
                                       
                      e.printStackTrace();
                                   
                      }
                               
                      } catch (ParseException e) {
                                   
                      e.printStackTrace();
                               
                      }
                           
                      }
                                                   
                      _("SensorIngestMapper.map: RETURN ...");
                       
                      }
                    }
                     
                    Bob
                        Barnhart
                    Chief
                      Systems Engineer | 858 826 5596 (Office) | 619 972
                      9489 (Mobile) |
                      barnhartr@xxxxxxxxxx
                      
                     
                     
                     
                     
                    Bob
                        Barnhart
                    Chief
                      Systems Engineer | 858 826 5596 (Office) | 619 972
                      9489 (Mobile) |
                      Robert.M.Barnhart@xxxxxxxxxx