Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [geomesa-users] GeoMesa Kryo serialization failed for SimpleFeature with certain length

Hey Emilio,

Sorry, I will not be able to work on this immediately due to other priorities. But feel free to take my local change and make it a proper commit. 


diff --git a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/impl/KryoFeatureSerialization.scala b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/impl/KryoFeatureSerialization.scala
index 192952850..03c76e2f9 100644
--- a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/impl/KryoFeatureSerialization.scala
+++ b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/impl/KryoFeatureSerialization.scala
@@ -86,7 +86,7 @@ trait KryoFeatureSerialization extends SimpleFeatureSerializer {
       } else {
         val buffer = output.getBuffer
-        var i = end
+        var i = end - 1
         while (i > offset) {
           buffer(i + shift) = buffer(i)
           i -= 1
diff --git a/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala b/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala
index cf9a6e2ad..aa53703fd 100644
--- a/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala
+++ b/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala
@@ -412,6 +412,24 @@ class KryoFeatureSerializerTest extends Specification with LazyLogging {
+    "shift buffered feature content in kryo output correctly" in {
+      val spec = "age:Int,name:String,dtg:Date,*geom:Point:srid=4326"
+      val sft = SimpleFeatureTypes.createType("test", spec)
+      val sf = ScalaSimpleFeature.create(sft, "fid-0", "10", null, "2013-01-02T00:00:00.000Z", "POINT(45.0 49.0)")
+      // 130997 is a magic number to make required buffer size equal to 131072 which is the smallest size to trigger
+      // a feature content shift to the very last byte of the buffer.
+      val name = new String(Array.fill(130997)(1.toByte), StandardCharsets.UTF_8)
+      sf.setAttribute("name", name)
+      val serializer = KryoFeatureSerializer(sft, Set.empty[SerializationOption])
+      val serialized = serializer.serialize(sf)
+      serialized.length must beGreaterThan(Short.MaxValue * 2)
+      val deserialized = serializer.deserialize(serialized)
+      deserialized.getAttribute("name") mustEqual name
+      deserialized mustEqual sf
+      deserialized.getUserData.asScala must beEmpty
+    }
     "be backwards compatible" in {
       val spec = "dtg:Date,*geom:Point:srid=4326"
       val sft = SimpleFeatureTypes.createType("testType", spec)

On Wed, Oct 14, 2020 at 3:39 PM Jun Cai <joncai2012@xxxxxxxxx> wrote:
Thanks Emilio for confirming. I have the fix and unit test ready locally. Will create a PR soon. 


On Wed, Oct 14, 2020 at 3:19 PM Emilio Lahr-Vivaz <elahrvivaz@xxxxxxxx> wrote:
Thanks for finding the issue, and taking the time to investigate it so thoroughly! Your solution seems correct, would you be willing to put it up as a PR? If you can share the values in the simple feature you were serializing that caused the error, we could also add a unit test to verify the fix.



On 10/14/20 6:09 PM, Jun Cai wrote:
Hi All,

I am seeing serialization failure while writing features of certain length with GeoMesaFeatureWriter. Here is part of the stack trace:
Suppressed: java.lang.IllegalArgumentException: Error indexing feature '1804882973#10760608:MULTIPOLYGON
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter$class.writeFeature(GeoMesaFeatureWriter.scala:55) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter$TableFeatureWriter.writeFeature(GeoMesaFeatureWriter.scala:141) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter$GeoMesaAppendFeatureWriter$class.write(GeoMesaFeatureWriter.scala:227) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter$$anon$3.write(GeoMesaFeatureWriter.scala:108) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.utils.geotools.FeatureUtils$.write(FeatureUtils.scala:141) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 131072
    at org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization$class.writeFeature(KryoFeatureSerialization.scala:91) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization$class.serialize(KryoFeatureSerialization.scala:42) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.features.kryo.KryoFeatureSerializer$MutableActiveSerializer.serialize(KryoFeatureSerializer.scala:78) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.index.api.WritableFeature$FeatureLevelWritableFeature$$anonfun$values$1$$anonfun$apply$1.apply(WritableFeature.scala:154) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
    at org.locationtech.geomesa.index.api.WritableFeature$FeatureLevelWritableFeature$$anonfun$values$1$$anonfun$apply$1.apply(WritableFeature.scala:154) ~[geomesa-hbase-spark-runtime_custom_2.11-2.4.1-20200904.jar:?]
Root Cause
With some debugging, I found the issue was in KryoFeatureSerialization at line 89 to 91:
var i = end
while (i > offset) {
buffer(i + shift) = buffer(i)
where it moves content in Kryo Output buffer by <shift> bytes starting from end position of current feature content which is acquired from line 78:
val end = output.position()
The output.position() here is the number of bytes that have not been flushed. In the output buffer, the zero-based index of the last byte is actually (end - 1). 
This means the code copies one more byte than needed and it becomes an issue when 
output.getBuffer.length = end + shift
which returns false for the buffer expansion check at line 82 
if (output.getBuffer.length < end + shift) {
and causes the ArrayIndexOutOfBoundsException when accessing buffer(i + shift).

An example from my use case, the feature content length is 131057 (end = 131058), shift is 14, output.getBuffer.length is 131072. When serializing this feature, 
it skipped the buffer expansion and tried to access buffer(131058 + 14), which is buffer(131072), and caused the ArrayIndexOutOfBoundsException. 
Proposed Fix
Change line 89 from
var i = end
var i = end - 1

Per my understanding of the code and local testing, this change shouldn't cause compatibility issue in feature serialization/deserialization. But would like to get it reviewed by more folks.

Jun Cai

Back to the top