object InsertDataTest {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("InsertDataTest").master("local[*]").getOrCreate()
val sftName = "insert_data4"
val dsParams: Map[String, String] = Map(
"hbase.catalog" -> "test",
"hbase.zookeepers" -> "development01:2181,development02:2181,development03:2181"
)
val ds = DataStoreFinder.getDataStore(dsParams.asJava)
if (ds.getSchema(sftName) == null) {
val sft = SimpleFeatureTypes.createType(sftName, "i:Integer,s:String")
ds.createSchema(sft)
try {
val fw = ds.getFeatureWriterAppend(sftName, Transaction.AUTO_COMMIT)
try {
val sf = fw.next
val currentTime = System.currentTimeMillis()
sf.setAttribute("i", currentTime)
sf.setAttribute("s", s"data-$currentTime")
fw.write()
} finally if (fw != null) fw.close()
}
}
for (i <- 0.until(20)) {
spark.read
.format("geomesa")
.options(dsParams)
.option("geomesa.feature", sftName)
.load()
.createOrReplaceTempView(sftName)
val queryDF = spark.sql(s"select i,s from $sftName")
println("***********queryDF: " + queryDF.count())
val schema = queryDF.schema
val rddToSave: RDD[SimpleFeature] = queryDF.rdd.mapPartitions { partition =>
val sft = WithStore[DataStore](dsParams)(_.getSchema(sftName))
val mappings = SparkUtils.rowsToFeatures(sft, schema)
partition.map { row =>
val sf = mappings.apply(row)
sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE)
sf
}
}
println("***********rddToSave: " + rddToSave.count())
GeoMesaSpark(dsParams.asJava).save(rddToSave, dsParams, sftName)
spark.read
.format("geomesa")
.options(dsParams)
.option("geomesa.feature", sftName)
.load()
.createOrReplaceTempView(s"${sftName}2")
val reQueryDF = spark.sql(s"select * from ${sftName}2")
println("***********reQueryDF: " + reQueryDF.count())
println("====================================================")
}
spark.stop()
}
}