Hi Adam,
I haven't seen an example of writing to GeoMesa via Spark. Since a
SimpleFeature will be written as 3 or more Accumulo mutations across
2 or more tables, it is probably a little bit of work to sort out
setting up an approach Spark output format.* We may or may not be
able to turn something like that around quickly.
That said, there are two other options:
First, if you are running locally or if the data is small enough,
you can 'collect' the SimpleFeatures you wish to write to GeoMesa
and then write them directly by getting a FeatureStore and calling
addFeatures. Admittedly, this approach has some scale limitations.
I only suggest it while we work out something a little more robust.
Second, one should be able to distribute the map of parameters
required to build a GeoMesa AccumuloDataStore to the Spark servers,
construct a FeatureStore on each node, and write features from
there. I'm thinking something like ...
queryRDD.foreachPartition { _.foreach { sf => // foreachPartition
gets us an Iterator[SimpleFeature]; this grabs them one at a time.
val ds =
DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
val fs =
ds.getFeatureSource("OutputFeatureName").asInstanceOf[AccumuloFeatureStore]
fs.addFeatures(new ListFeatureCollection(fs.getSchema,
List(sf)))
}
}
I say 'one' since as I tried tonight and received errors like
14/12/09 18:20:01 INFO scheduler.DAGScheduler: Failed to run collect
at <console>:41
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException:
org.locationtech.geomesa.core.data.AccumuloDataStore
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
Thanks again for the questions about Spark!
Jim
* One would need to get a FeatureStore inside the output
class/object to create the actual Accumulo mutations.
On 12/09/2014 05:29 PM, Adam F wrote:
Does anyone have an example of how to write to geomesa from
Spark. This is what I have so far. I'd welcome any feedback.
val job = new Job()
val config = new Configuration
ConfiguratorBase.setConnectorInfo(classOf[AccumuloOutputFormat],
config, ds.connector.whoami(), ds.authToken)
ConfiguratorBase.setZooKeeperInstance(classOf[AccumuloOutputFormat],
config, ds.connector.getInstance().getInstanceName,
ds.connector.getInstance().getZooKeepers)
OutputConfigurator.setDefaultTableName(classOf[AccumuloOutputFormat],
config, ds.getSpatioTemporalIdxTableName(sft))
val output = inputRDD.map(toFeature)
.saveAsNewAPIHadoopFile(config.getString("accumulo.instance"),
classOf[Void],
classOf[SimpleFeature],
classOf[AccumuloOutputFormat],
job.getConfiguration)
Assume I already have the geomesa tables created and I have a
handle to the datastore in the form of ds. I also have the
SimpleFeatureType as sft. Something tells me I have default
tableName identified incorrectly. Also, I'm not sure what
should go with all the classOf calls.
Thanks
-- Adam
_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users
|