Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] Problem with spark example

Adam,

I've put up a branch here:

https://github.com/locationtech/geomesa/tree/f_sparkfix

Were there any other changes you made to make things work?

The diff is available here:
https://github.com/locationtech/geomesa/compare/f_sparkfix

Thanks for pointing those things out.

Andrew

On 11/26/2014 12:38 PM, Andrew Hulbert wrote:
Adam,

For tracking purposes I have created this ticket...feel free to comment/add info.

https://geomesa.atlassian.net/browse/GEOMESA-526

Andrew

On 11/26/2014 12:33 PM, Andrew Hulbert wrote:
Hi Adam,

We're currently looking for a solution to the serializability of the Avro encoders/decoders...we were playing with MeatLocker from twitter (https://github.com/twitter/chill#the-meatlocker) but it didn't work out of the box due to dependency errors so we're trying to figure that out.

We'll get back to you soon.

Andrew

On 11/26/2014 11:49 AM, Adam F wrote:
Still having trouble with Geomesa's Spark package.  In my earlier post I mentioned getting an AccumuloException.  My trace of what's happening seems to point to the line

InputConfigurator.setOfflineTableScan(classOf[AccumuloInputFormat], conf, true)

in the rdd function.  This function is supposed to enable the use of offline tables, but it seems to force execution down a path that only leads to the exception if the table is online (which mine is).  Since offline tables are not a major issue for my project, I simply changed it to false so I could move on.  I'll re-address later.

When I run the code after making this change, I'm now getting a serialization error.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.locationtech.geomesa.core.data.AvroFeatureDecoder
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/11/26 11:10:35 INFO BlockManagerInfo: Registering block manager aspen-vm-019.aspen.local:41483 with 2.3 GB RAM

I think the problem is in the the mapPartitions function within the rdd function. The lines:

val sft = SimpleFeatureTypes.createType(typeName, spec)
val encoder = new AvroFeatureEncoder(sft)

don't seem to contribute anything to the transformation of the partition.  That is, encoder is not used.  This made me think there is a typo or something.  I changed

val encoder = new AvroFeatureEncoder(sft)

to

val decoder = new AvroFeatureDecoder(sft)

and the error went away and my project ran to completion. 

Thanks
-- Adam



On Tue, Nov 25, 2014 at 8:38 AM, Adam F <tigclem00@xxxxxxxxx> wrote:
Jim,

I should have mentioned a few things earlier.  First, the issue with the core.index package object seems to occur when the package object is initialized.  This takes place, it seems before, my Spark job is initiated.  Here is the complete output of the error:

Exception in thread "main" java.lang.ExceptionInInitializerError
        at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$1.apply(AccumuloDataStore.scala:712)
        at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$1.apply(AccumuloDataStore.scala:705)
        at scala.Option.map(Option.scala:145)
        at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:705)
        at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:701)
        at runnables.GeomesaInputRunnable.readInput(GeomesaInputRunnable.scala:51)
        at com.apex.CEngine$class.run(CEngine.scala:48)
        at runnables.GeomesaInputRunnable.run(GeomesaInputRunnable.scala:36)
        at com.apex.CRun$.runJob(CRun.scala:120)
        at com.apex.CRun$.run(CRun.scala:105)
        at com.apex.CRun$.main(CRun.scala:49)
        at com.apex.CRun.main(CRun.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArithmeticException: Adding time zone offset caused overflow
        at org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:965)
        at org.joda.time.chrono.ZonedChronology$ZonedDateTimeField.get(ZonedChronology.java:422)
        at org.joda.time.base.BaseDateTime.<init>(BaseDateTime.java:129)
        at org.joda.time.base.BaseDateTime.<init>(BaseDateTime.java:97)
        at org.joda.time.DateTime.<init>(DateTime.java:209)
        at org.locationtech.geomesa.core.index.package$.<init>(index.scala:36)
        at org.locationtech.geomesa.core.index.package$.<clinit>(index.scala)
        ... 19 more


Second I'd like to point you to Sotora's Aggregate Micropath GitHub project (https://github.com/Sotera/aggregate-micro-paths).  I've used it as template of sorts for creating my Spark analytics.  If you look at their Spark project you'll notice it includes a runnable package.  This is where they handle input formats.  I'm essentially creating a new runnable to work with Geomesa.  I've also updated my analytic template to work with Spark-Jobserver (https://github.com/ooyala/spark-jobserver).  While my project works with Jobserver, I find it easier (for now) to just use spark-submit. 

Finally, you asked if I got my Spark job to run successfully.  No I haven't just yet.  After figuring out the issue with my use of DateTime, I'm now getting what an Accumulo error:

Exception in thread "main" java.io.IOException: org.apache.accumulo.core.client.AccumuloException: Table is online geomesa(e) cannot scan table in offline mode
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.getSplits(InputFormatBase.java:868)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094)
        at org.apache.spark.rdd.RDD.count(RDD.scala:847)
        at com.apex.CEngine$class.run(CEngine.scala:49)
        at runnables.GeomesaInputRunnable.run(GeomesaInputRunnable.scala:36)
        at com.apex.CRun$.runJob(CRun.scala:120)
        at com.apex.CRun$.run(CRun.scala:105)
        at com.apex.CRun$.main(CRun.scala:49)
        at com.apex.CRun.main(CRun.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.accumulo.core.client.AccumuloException: Table is online geomesa(e) cannot scan table in offline mode
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.binOfflineTable(InputFormatBase.java:712)
        at org.apache.accumulo.core.client.mapreduce.InputFormatBase.getSplits(InputFormatBase.java:841)
        ... 30 more

The table I'm working with inside Geomesa is called geomesa.  I've just started to review this error.  The output before the error is here:

spark.default.parallelism 8
spark.akka.frameSize 200
spark.storage.memoryFraction 0.5
check point dir: /tmp/checkpoints
Running on geomesa table: Some(geomesa)
Scanning ST index table for feature type geomesa
Filter: [ geom bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ]
Geometry filters: List([ geom bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ])
Temporal filters: List()
Other filters: List()
Tweaked geom filters are List([ Location bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)
) ])
GeomsToCover: List(POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)))
Planning query
Random Partition Planner: Vector(00, 01, 02, 03, 04, 05, 06, 07, 08, 09, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1
9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46,
 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 7
4, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
ConstPlanner: geomesa
GeoHashKeyPlanner: 4 : List(..., t.., tt., ttb)
DatePlanner: start: 00000101 end: 99991231
Total ranges: 400
ColumnFamily Planner: 109 : List(.., 8., 8y, 8z, 9., 9n, 9p, 9q, 9r, 9w, 9x, 9y, 9z, b., bb, bc, bf, bg, bu, b
v)
STII Filter: [ Location bbox POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5)) ]
Interval:  No interval
Filter: SpatialFilter(GEOMETRYCOLLECTION (POLYGON ((68.5 32.5, 68.5 32.9, 68.9 32.9, 68.9 32.5, 68.5 32.5))))

Please let me know if you've seen this error before.  If not, I'll report what I discovery during my investigation.

Thanks
-- Adam





On Mon, Nov 24, 2014 at 4:29 PM, Jim Hughes <jnh5y@xxxxxxxx> wrote:
Hi Adam,

I was just about to write sharing what I know.  I tried to replicate your issue, and I made it a little further than what you reported.  Thanks for sharing what you learned about Joda DateTime versions being slightly out of sync.

My efforts focused on using the Spark-shell, and I ran into some other serialization issues.  I've worked with Andrew Hulbert a few minutes this evening to try and address things.  We'll be looking into things some more.  He said he'd have a little more time to look into things tomorrow.

Out of curiosity, did you managed to your Spark job to run successfully?  If so, how have you been submitting your jobs?

Thanks for asking a great question,

Jim



On 11/24/2014 04:58 PM, Adam F wrote:
I've been working the error and discovered it was due to conflicting versions of DateTime.  My spark job was requiring 2.4 and Geomesa requires 2.3.  When I packaged my job into an uber jar, it was only packaging 2.4.  This is an issues because apparently 2.4 of DateTime has issues with using Long.MinValue and Long.MaxValue (reference https://github.com/JodaOrg/joda-time/issues/190) when initializing a DateTime object.  This same approach is used to initialize a few DateTime objects in org.locationtech.geomesa.core.index.scala (line 36/37).

Thanks
-- Adam
 

On Fri, Nov 21, 2014 at 9:17 PM, Adam Fraser <tigclem00@xxxxxxxxx> wrote:

I'm working with Spark and attempting to emulate the Spark example (http://www.geomesa.org/2014/08/05/spark/), but am having problems. 

The example creates a dataStore using:

val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]

 It then sends ds into an init function found in compute.spark.GeoMesaSpark. This init function calls ds.getSchema.  This is where the problem is occurring.  I'm getting the following error:

 

Exception in thread "main" java.lang.ExceptionInInitializerError at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$2.apply(AccumuloDataStore.scala:712)

at org.locationtech.geomesa.core.data.AccumuloDataStore$$anonfun$getSchema$2.apply(AccumuloDataStore.scala:703)

at scala.Option.map(Option.scala:145)

at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:703)

at org.locationtech.geomesa.core.data.AccumuloDataStore.getSchema(AccumuloDataStore.scala:701)

After some debugging efforts, I've pretty much determined that this error is occurring because of the use of the core.index package object.  If you look inside the getSchema function inside AccumuloDataStore you'll see it is used several times to retrieve a few strings and call a function. 

It is like the core.index package object hasn't been initialized. 

Any ideas for how I can get getSchema to work?  Also, is it possible for you to post a complete solution to the Spark Example on GitHub?

 

Thanks

-- Adam

 


Sent from my iPad



_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users


_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users





_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users



_______________________________________________
geomesa-users mailing list
geomesa-users@xxxxxxxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
http://www.locationtech.org/mailman/listinfo/geomesa-users


Back to the top