Still having trouble with Geomesa's Spark
package. In my earlier post I mentioned getting
an AccumuloException. My trace of what's
happening seems to point to the line
InputConfigurator.setOfflineTableScan(classOf[AccumuloInputFormat],
conf, true)
in the rdd function. This function is supposed to
enable the use of offline tables, but it seems to
force execution down a path that only leads to the
exception if the table is online (which mine is).
Since offline tables are not a major issue for my
project, I simply changed it to false so I could
move on. I'll re-address later.
When I run the code after making this change, I'm now
getting a serialization error.
Exception in thread "main"
org.apache.spark.SparkException: Job aborted due to
stage failure: Task not serializable:
java.io.NotSerializableException:
org.locationtech.geomesa.core.data.AvroFeatureDecoder
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at
akka.actor.ActorCell.invoke(ActorCell.scala:456)
at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at
akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/11/26 11:10:35 INFO BlockManagerInfo: Registering
block manager aspen-vm-019.aspen.local:41483 with 2.3
GB RAM
I think the problem is in the the mapPartitions function
within the rdd function. The lines:
val encoder = new AvroFeatureEncoder(sft)