(My
                apologies if this is a re-post due to email-address
                issues)
               
              I
                have experienced problems ingesting any of the sample
                GDELT data cited from the GeoMesa tutorial instructions
                on
                http://geomesa.github.io/2014/04/17/geomesa-gdelt-analysis/.
               
              Consequently,
                I tried to adapt the code from  https://github.com/ccri/geomesa-gdelt
                to ingest a different data-set.
               
              My
                data (in.CSV format—I did convert to .TSV format for
                ingestion) looks like:
               
              EventID,Sensor,Time,Latitude,Longitude,Altitude,MinAxis,MajAxis,Orientation
              1,123,1400086863513,36.966,-82.589,0,792.458,960.222,337.966
              2,123,1400086864516,35.783,-82.647,0,229.497,552.263,66.236
              3,123,1400086865518,37.215,-84.237,0,283.729,825.958,38.794
              4,123,1400086866521,37.609,-82.255,0,737.285,919.335,108.117
              5,123,1400086867523,35.053,-82.897,0,710.054,942.109,46.797
              6,123,1400086868525,37.145,-84.328,0,695.256,976.911,40.299
              7,123,1400086869528,34.224,-84.809,0,645.225,936.959,260.217
              8,123,1400086870530,36.826,-85.623,0,430.444,794.505,257.836
              9,123,1400086871542,34.846,-86.305,0,727.479,732.795,8.621
              10,123,1400086872556,33.861,-86.148,0,700.236,761.757,160.848
              .
                . .
               
              (Note:
                the “Time” field is a Unix/Java-style time: i.e., #
                milliseconds since 1970.01.01).
               
              I
                adapted the GDELTIngest class to SensorIngest.java as
                shown below.
               
              I
                also adapted the GDELTIngestMapper class to
                SensorIngestMapper.java as shown below.
               
              I
                essentially used the technique from the tutorial to
                upload my data-file to HDFS (after converting to .TSV
                format).
               
              I
                then used basically the same ingestion command from the
                tutorial:
               
              hadoop
                jar ./target/geomesa-sensor-1.0-SNAPSHOT.jar 
                 geomesa.sensor.SensorIngest     -instanceId
                ntc-irad        -zookeepers localhost:2181      -user
                root -password “…”     -auths public       -tableName
                sensor       -featureName sensor     -ingestFile
                hdfs:///sensor/sensor.tsv
               
              The
                ingestion begins, the “sensor” table is created in
                Accumulo, but after processing 4 “entries”, the
                ingestion fails with the following NullPointerException
                (in Hadoop’s MapReduce?):
               
              ava.lang.Exception:
                java.lang.NullPointerException
                 
                at
org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
                 
                at
                org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
              Caused
                by: java.lang.NullPointerException
                 
                at
geomesa.core.data.AccumuloFeatureReader.<init>(AccumuloFeatureReader.scala:42)
                 
                at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:283)
                 
                at
geomesa.core.data.AccumuloDataStore.getFeatureReader(AccumuloDataStore.scala:275)
                 
                at
geomesa.core.data.ModifyAccumuloFeatureWriter.<init>(AccumuloFeatureWriter.scala:146)
                 
                at
geomesa.core.data.AccumuloDataStore.createFeatureWriter(AccumuloDataStore.scala:294)
                 
                at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:470)
                 
                at
org.geotools.data.AbstractDataStore.getFeatureWriter(AbstractDataStore.java:503)
                 
                at
                geomesa.sensor.SensorIngestMapper.setup(SensorIngestMapper.java:64)
                 
                at
                org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
                 
                at
                org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
                 
                at
                org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
                 
                at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
                 
                at
                java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
                 
                at
                java.util.concurrent.FutureTask.run(FutureTask.java:262)
                 
                at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                 
                at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                 
                at java.lang.Thread.run(Thread.java:744)
              14/05/14
                13:44:37 INFO mapreduce.Job: Job
                job_local1747290386_0001 failed with state FAILED due
                to: NA
              14/05/14
                13:44:37 INFO mapreduce.Job: Counters: 0
              Exception
                in thread "main" java.lang.Exception: Job failed
                 
                at
                geomesa.sensor.SensorIngest.runMapReduceJob(SensorIngest.java:156)
                 
                at
                geomesa.sensor.SensorIngest.main(SensorIngest.java:112)
                 
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
                Method)
                 
                at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                 
                at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                 
                at java.lang.reflect.Method.invoke(Method.java:606)
                 
                at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
               
              The
                line tagged with “!!!” in SensorIngestMapper.java
                listing is where the code is dying (in the
                DataStore.getFeatureWriter call).
               
              I’m
                probably missing something simple, but so far I’ve been
                unable to get this simple data-file to ingest into
                Accumulo using the GeoMesa/Hadoop mechanism (although I
                can ingest data directly into Accumulo).
               
              Also,
                I’m not sure what attribute-type to associate with the
                “Time” field in the “attributes” List. I’ve tried
                “:Time”, “:Timestamp”, “:Datetime”, etc. with no
                success, so I’ve had to label it simply as “:Integer”.
               
              Any
                help or suggestions would be greatly appreciated!
               
              Regards,
              Bob
                Barnhart (barnhartr@xxxxxxxxxx)
               
              //
                SensorIngest.java
===========================================================================================
               
              package
                geomesa.sensor;
               
              import
                com.google.common.base.Joiner;
              import
                com.google.common.collect.Lists;
              import
                geomesa.core.index.Constants;
              import
org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
              import
                org.apache.commons.cli.*;
              import
                org.apache.hadoop.conf.Configuration;
              import
                org.apache.hadoop.fs.FileStatus;
              import
                org.apache.hadoop.fs.FileSystem;
              import
                org.apache.hadoop.fs.Path;
              import
                org.apache.hadoop.mapreduce.Job;
              import
                org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
              import
                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
              import
                org.geotools.data.DataStore;
              import
                org.geotools.data.DataStoreFinder;
              import
                org.geotools.data.DataUtilities;
              import
                org.geotools.feature.SchemaException;
              import
                org.opengis.feature.simple.SimpleFeatureType;
               
              import
                java.util.*;
               
              public
                class SensorIngest {
               
                 
                static String INSTANCE_ID = "instanceId";
                 
                static String ZOOKEEPERS = "zookeepers";
                 
                static String USER = "user";
                 
                static String PASSWORD = "password";
                 
                static String AUTHS = "auths";
                 
                static String TABLE_NAME = "tableName";
                 
                static String FEATURE_NAME = "featureName";
                 
                static String INGEST_FILE = "ingestFile";
                 
                static String[] ACCUMULO_CONNECTION_PARAMS = new
                String[]{INSTANCE_ID,
                         
                ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};
               
                 
                static Options getCommonRequiredOptions() {
                     
                Options options = new Options();
                     
                Option instanceIdOpt =
                OptionBuilder.withArgName(INSTANCE_ID)
                                                      
                .hasArg()
                                                      
                .isRequired()
                                                      
                .withDescription("accumulo connection parameter
                instanceId")
                                               
                       .create(INSTANCE_ID);
                     
                Option zookeepersOpt =
                OptionBuilder.withArgName(ZOOKEEPERS)
                                                      
                .hasArg()
                                                      
                .isRequired()
                                                      
                .withDescription("accumulo connection parameter
                zookeepers")
                                                      
                .create(ZOOKEEPERS);
                     
                Option userOpt = OptionBuilder.withArgName(USER)
                                                
                      .hasArg()
                                                      
                .isRequired()
                                                      
                .withDescription("accumulo connection parameter user")
                                                      
                .create(USER);
                     
                Option passwordOpt = OptionBuilder.withArgName(PASSWORD)
                                                      
                .hasArg()
                                                      
                .isRequired()
                                                      
                .withDescription("accumulo connection parameter
                password")
                               
                                       .create(PASSWORD);
                     
                Option authsOpt = OptionBuilder.withArgName(AUTHS)
                                                      
                .hasArg()
                                                      
                .withDescription("accumulo connection parameter auths")
                    
                                                  .create(AUTHS);
                     
                Option tableNameOpt =
                OptionBuilder.withArgName(TABLE_NAME)
                                                      
                .hasArg()
                                                      
                .isRequired()
                                             
                         .withDescription("accumulo connection parameter
                tableName")
                                                      
                .create(TABLE_NAME);
                     
                Option featureNameOpt =
                OptionBuilder.withArgName(FEATURE_NAME)
                                                      
                .hasArg()
               
                                                       .isRequired()
                                                      
                .withDescription("name of feature in accumulo table")
                                                      
                .create(FEATURE_NAME);
                     
                options.addOption(instanceIdOpt);
               
                      options.addOption(zookeepersOpt);
                     
                options.addOption(userOpt);
                     
                options.addOption(passwordOpt);
                     
                options.addOption(authsOpt);
                     
                options.addOption(tableNameOpt);
                     
                options.addOption(featureNameOpt);
                     
                return options;
                 
                }
               
                 
                public static Map<String, String>
                getAccumuloDataStoreConf(CommandLine cmd) {
                     
                Map<String , String> dsConf = new
                HashMap<String , String>();
                     
                for (String param : ACCUMULO_CONNECTION_PARAMS) {
                         
                dsConf.put(param, cmd.getOptionValue(param));
                     
                }
                     
                if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS, "");
                     
                return dsConf;
                 
                }
               
                 
                public static void main(String [ ] args) throws
                Exception {
                     
                CommandLineParser parser = new BasicParser();
                     
                Options options = getCommonRequiredOptions();
                     
                Option ingestFileOpt =
                OptionBuilder.withArgName(INGEST_FILE)
                                                      
                .hasArg()
                                                      
                .isRequired()
                                
                                      .withDescription("ingest tsv file
                on hdfs")
                                                      
                .create(INGEST_FILE);
                     
                options.addOption(ingestFileOpt);
               
                     
                CommandLine cmd = parser.parse( options, args);
                     
                Map<String, String> dsConf =
                getAccumuloDataStoreConf(cmd);
               
                     
                String featureName = cmd.getOptionValue(FEATURE_NAME);
                // e.g. "sensor"
                     
                SimpleFeatureType featureType =
                buildFeatureType(featureName);
               
                     
                DataStore ds = DataStoreFinder.getDataStore(dsConf);
                     
                ds.createSchema(featureType);
               
                     
                runMapReduceJob(featureName,
                
                                                             
                "GeoMesa SensorEvent Ingest",
                
                                                             
                SensorIngestMapper.class,
                         
                dsConf,
                         
                new Path(cmd.getOptionValue(INGEST_FILE)));
                 
                }
               
                 
                private static void runMapReduceJob(String featureName,
                String jobName, Class mapper,
                                                     
                Map<String, String> dsConf,
                                                     
                Path mapredCSVFilePath) throws Exception {
                     
                Job job = Job.getInstance(new Configuration());
                     
                job.setMapperClass(mapper);
                     
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
                     
                job.setJobName(jobName);
                     
                Configuration conf = job.getConfiguration();
                     
                for (String key : dsConf.keySet()) {
                         
                conf.set(key, dsConf.get(key));
                     
                }
                     
                conf.set(FEATURE_NAME, featureName);
                     
                FileSystem fs = FileSystem.get(conf);
                     
                FileInputFormat.setInputPaths(job, mapredCSVFilePath);
                     
                Path tmpPath = new Path("///tmp");
                     
                if (!fs.exists(tmpPath)) {
                         
                fs.create(tmpPath);
                     
                }
                     
                Path outputDir = new Path("///tmp",
                "geomesa-"+featureName+"-output");
                
                      if
                (fs.exists(outputDir)) {
                       
                // remove this directory, if it already exists
                   
                      fs.delete(outputDir, true);
                     
                }
                     
                Path hdfsJarPath = new Path("///tmp",
                "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
                     
                if (fs.exists(hdfsJarPath)) {
                       
                // remove this jar, if it already exists
                         
                fs.delete(hdfsJarPath, true);
                     
                }
                     
                FileOutputFormat.setOutputPath(job, outputDir);
                     
                fs.copyFromLocalFile(new
                Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"),
                hdfsJarPath);
                     
                for (FileStatus path : fs.listStatus(hdfsJarPath)) {
                         
                job.addArchiveToClassPath(new
                Path(path.getPath().toUri().getPath()));
                     
                }
               
                     
                job.submit();
               
                     
                if (!job.waitForCompletion(true)) {
                         
                throw new Exception("Job failed");
                     
                }
                 
                }
               
               
                 
                private static SimpleFeatureType buildFeatureType(String
                featureName) throws SchemaException {
                     
                String name = featureName;
                     
                String spec = Joiner.on(",").join(attributes);
                     
                SimpleFeatureType featureType =
                DataUtilities.createType(name, spec);
                     
                featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME,
                "Time"); // Tells GeoMesa to use "Time" as the Start
                Time index
                     
                return featureType;
                 
                }
               
                 
                static List<String> attributes =
                Lists.newArrayList(
                                             
                "EventID:Integer",
                                             
                "Sensor:Integer",
                                             
                "Time:Integer",                                // TODO:
                ISSUE I001 - actually a "Unix" time (e.g., millis since
                1970.01.01)
                                             
                "Latitude:Float",
                                             
                "Longitude:Float",
                                             
                "Altitude:Integer",
                                             
                "MinAxis:Float",
                                             
                "MajAxis:Float",
                                             
                "Orientation:Float"
                             
                );
              }
               
              //
                SensorIngestMapper.java
====================================================================================
               
              package
                geomesa.sensor;
               
              import
                com.vividsolutions.jts.geom.Coordinate;
              import
                com.vividsolutions.jts.geom.Geometry;
              import
                com.vividsolutions.jts.geom.GeometryFactory;
              import
                org.apache.accumulo.core.data.Key;
              import
                org.apache.accumulo.core.data.Value;
              import
                org.apache.hadoop.io.LongWritable;
              import
                org.apache.hadoop.io.Text;
              import
                org.apache.hadoop.mapreduce.Mapper;
              import
                org.geotools.data.DataStore;
              import
                org.geotools.data.DataStoreFinder;
              import
                org.geotools.data.FeatureWriter;
              import
                org.geotools.data.Transaction;
              import
                org.geotools.feature.simple.SimpleFeatureBuilder;
              import
                org.geotools.filter.identity.FeatureIdImpl;
              import
                org.geotools.geometry.jts.JTSFactoryFinder;
              import
                org.opengis.feature.simple.SimpleFeature;
              import
                org.opengis.feature.simple.SimpleFeatureType;
               
              import
                java.io.IOException;
              import
                java.text.ParseException;
              import
                java.text.SimpleDateFormat;
              import
                java.util.HashMap;
              import
                java.util.Map;
               
              public
                class SensorIngestMapper extends
                Mapper<LongWritable,Text,Key,Value> {
               
                             
                // Columns: EventID, Sensor, Time, Latitude, Longitude,
                Altitude, MinAxis, MajAxis, Orientation
                 
                private static int ID_COL_IDX = 0;
                 
                private static int TIME_COL_IDX = 2;
                 
                private static int LATITUDE_COL_IDX  = 3;
                 
                private static int LONGITUDE_COL_IDX = 4;
                 
                private static int MINIMUM_NUM_FIELDS = 9;
               
                 
                private SimpleFeatureType      featureType = null;
                 
                private FeatureWriter<SimpleFeatureType,
                SimpleFeature> featureWriter = null;
                 
                private SimpleFeatureBuilder featureBuilder;
                 
                private GeometryFactory          geometryFactory =
                JTSFactoryFinder.getGeometryFactory();
               
                             
                private static void _(String s) {
                                             
                System.out.println("("+s+")");
                             
                }
               
                 
                public void
                setup(Mapper<LongWritable,Text,Key,Value>.Context
                context) throws IOException, InterruptedException {
                     
                super.setup(context);
                                             
                _("SensorIngestMapper.setup: ENTER...");
               
                     
                Map<String , String> connectionParams = new
                HashMap<String , String>();
                     
                connectionParams.put("instanceId",
                context.getConfiguration().get("instanceId"));
                     
                connectionParams.put("zookeepers",
                context.getConfiguration().get("zookeepers"));
                     
                connectionParams.put("user",
                context.getConfiguration().get("user"));
                     
                connectionParams.put("password",
                context.getConfiguration().get("password"));
                     
                connectionParams.put("auths",
                context.getConfiguration().get("auths"));
                     
                connectionParams.put("tableName",
                context.getConfiguration().get("tableName"));
               
                     
                String featureName =
                context.getConfiguration().get("featureName");
                                             
                _("SensorIngestMapper.setup: featureName="+featureName);
                     
                DataStore ds =
                DataStoreFinder.getDataStore(connectionParams);
                     
                featureType = ds.getSchema(featureName);
                                             
                _("SensorIngestMapper.setup:
                featureType="+featureType.getTypeName());
                     
                featureBuilder = new SimpleFeatureBuilder(featureType);
                                             
                _("SensorIngestMapper.setup: featureBuilder OK...");
                     
                featureWriter = ds.getFeatureWriter(featureName,
                Transaction.AUTO_COMMIT); // NullPointerException on
                this call!!!
                                             
                _("SensorIngestMapper.setup: RETURN.");
                 
                }
               
                 
                public void map(LongWritable key, Text value,
                Mapper<LongWritable,Text,Key,Value>.Context
                context) {
                                             
                _("SensorIngestMapper.map: ENTER...");
                     
                String[] attributes = value.toString().split("\\t",
                -1);
                     
                if (attributes.length >= MINIMUM_NUM_FIELDS
                && !attributes[LATITUDE_COL_IDX].equals("")
                && !attributes[LONGITUDE_COL_IDX].equals("")) {
                         
                SimpleDateFormat formatter = new
                SimpleDateFormat("yyyyMMdd");
                         
                try {
                             
                featureBuilder.reset();
                             
                featureBuilder.addAll(attributes);
               
                             
                Double lat =
                Double.parseDouble(attributes[LATITUDE_COL_IDX]); //
                TODO: update for Lat/Lon offsets
                
                             Double lon =
                Double.parseDouble(attributes[LONGITUDE_COL_IDX]);
               
                             
                Geometry geom = geometryFactory.createPoint(new
                Coordinate(lon, lat));
                             
                SimpleFeature simpleFeature =
                featureBuilder.buildFeature(attributes[ID_COL_IDX]);
                             
                simpleFeature.setAttribute("Time",
                formatter.parse(attributes[TIME_COL_IDX]));
                             
                simpleFeature.setDefaultGeometry(geom);
               
                             
                try {
                                 
                SimpleFeature next = featureWriter.next();
                        
                         for (int i = 0; i <
                simpleFeature.getAttributeCount(); i++) {
                                     
                next.setAttribute(i, simpleFeature.getAttribute(i));
                                 
                }
                                 
((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
                                 
                featureWriter.write();
                             
                } catch (IOException e) {
                                 
                e.printStackTrace();
                             
                }
                         
                } catch (ParseException e) {
                             
                e.printStackTrace();
                         
                }
                     
                }
                                             
                _("SensorIngestMapper.map: RETURN ...");
                 
                }
              }
               
              Bob
                  Barnhart
              Chief
                Systems Engineer | 858 826 5596 (Office) | 619 972 9489
                (Mobile) |
                barnhartr@xxxxxxxxxx
                
               
               
               
               
              Bob
                  Barnhart
              Chief
                Systems Engineer | 858 826 5596 (Office) | 619 972 9489
                (Mobile) |
                Robert.M.Barnhart@xxxxxxxxxx