Hi, I wrote unit tests using org.apache.accumulo.minicluster.MiniAccumuloCluster and in tests everything is fine with mycode. But the problem with read db remains.
package ru.company.gps.stats;
import io.helidon.config.Config;
import io.helidon.config.ConfigSources;
import lombok.extern.slf4j.Slf4j;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.Transaction;
import org.geotools.data.simple.SimpleFeatureWriter;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.geotools.util.factory.Hints;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams;
import org.locationtech.geomesa.utils.interop.SimpleFeatureTypes;
import org.locationtech.geomesa.utils.stats.EnumerationStat;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Point;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import scala.Option;
import scala.collection.JavaConverters;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static ru.netris.gps.geo.SimpleFeatureUtils.GEOMETRY_FACTORY;
import static ru.netris.gps.geo.SimpleFeatureUtils.TYPE;
@Slf4j
public class StatsTest {
private AccumuloDataStore ds;
private GeomesaServiceCachedImpl sut;
private SimpleFeatureType sft;
@BeforeEach
public void setUp()
throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException, NamespaceExistsException {
MiniCluster miniCluster = new MiniCluster();
MiniAccumuloCluster cluster = miniCluster.startCluster();
Map<String, String> dsParams = new HashMap<>(10);
dsParams.put(AccumuloDataStoreParams.InstanceIdParam().key,
cluster.getInstanceName());
dsParams.put(AccumuloDataStoreParams.ZookeepersParam().key,
cluster.getZooKeepers());
dsParams.put(AccumuloDataStoreParams.UserParam().key, miniCluster.user);
dsParams.put(AccumuloDataStoreParams.PasswordParam().key, miniCluster.password);
dsParams.put(AccumuloDataStoreParams.CatalogParam().key, "myNamespace." + "geomesa");
ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsParams);
}
@Test
public void successTestUsingGeneratedSchemaWithoutSut() throws IOException, CQLException {
ds.createSchema(TYPE);
BroadcastService mock = mock(BroadcastService.class);
Config config = Config.create(ConfigSources.create(
Map.of("query.period", "2592000", "query.maxCoordinatesWithoutSampling", "1000",
"query.maxCameraCount", "50")));
sut = new GeomesaServiceCachedImpl(ds, mock, config);
VsvnEvent event = new VsvnEvent("cam", 1234567L, new Latitude(57.33, LatType.North),
new Longitude(33.22, LonType.East), "imei", 13.0,
new Azimuth(123.0, null), null, null);
SimpleFeatureWriter writer = ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT);
SimpleFeature sf = writer.next();
long time =
event.getTime() > (long) Integer.MAX_VALUE ? event.getTime() : event.getTime() * 1000L;
String EventID = event.getCam() + "-" + time /*event.time*/;
// if altitude > 0, add altitude to coordinate ?
Point point = GEOMETRY_FACTORY.createPoint(
new Coordinate(event.getLon().getValue(), event.getLat().getValue())); // Point(X, Y)
SimpleFeature f = new SimpleFeatureBuilder(TYPE).buildFeature(EventID); // SimpleFeature
sf.setAttribute("geo", point);
sf.setAttribute("time", new java.util.Date(time)); // (event.time));
sf.setAttribute("cam", event.getCam());
sf.setAttribute("imei", event.getImei());
sf.setAttribute("dir", event.getDir());
sf.setAttribute("alt", event.getAlt());
sf.setAttribute("vlc", event.getVlc());
sf.setAttribute("sl", event.getSl());
sf.setAttribute("ds", event.getDs().toInt());
sf.setAttribute("dir_y", event instanceof VsvnEvent ? ((VsvnEvent) event).getDirV() : null);
sf.setAttribute("poi_azimuth_x",
event instanceof VsvnEvent ? ((VsvnEvent) event).getPoi() : null);
sf.setAttribute("poi_azimuth_y",
event instanceof VsvnEvent ? ((VsvnEvent) event).getPoiV() : null);
writer.write();
writer.close();
Option<Object> count = ds.stats().getCount(sft, ECQL.toFilter("cam='cam'"), true, new Hints());
Assertions.assertFalse(count.isEmpty());
assertEquals(1L, count.get());
}
@Test
public void successWriteTwoEventsAndGetStatisticsUsingSUT() throws CQLException {
ds.createSchema(TYPE);
BroadcastService mock = mock(BroadcastService.class);
Config config = Config.create(ConfigSources.create(
Map.of("query.period", "2592000", "query.maxCoordinatesWithoutSampling", "1000",
"query.maxCameraCount", "50")));
sut = new GeomesaServiceCachedImpl(ds, mock, config);
VsvnEvent event = new VsvnEvent("cam", 1234567L, new Latitude(57.33, LatType.North),
new Longitude(33.22, LonType.East), "imei", 13.0,
new Azimuth(123.0, null), null, null);
VsvnEvent event2 = new VsvnEvent("cam2", 1234567L, new Latitude(57.33, LatType.North),
new Longitude(33.22, LonType.East), "imei2", 13.0,
new Azimuth(123.0, null), null, null);
ArrayList<GeoEvent> eventList = new ArrayList<>(2);
eventList.add(event);
eventList.add(event2);
sut.writeData(eventList);
Option<Object> count = ds.stats().getCount(sft, ECQL.toFilter("cam='cam2'"), false,
new Hints());
Assertions.assertFalse(count.isEmpty());
assertEquals(1L, count.get());
Option<EnumerationStat<Object>> cam = ds.stats().getEnumeration(sft, "cam",
ECQL.toFilter("1=1"), true);
scala.collection.mutable.Map<Object, Object> enumeration = cam.get().enumeration();
Map<Object, Object> camCount = JavaConverters.mapAsJavaMap(enumeration);
assertEquals(2, camCount.size());
}
//
@Test
public void successTestWithSUTWriteWithGenerationStringSchema() throws IOException, CQLException {
String spec = "*geo:Point,time:Date:index=true,cam:String:keep-stats=true:index=true,imei:String,dir:Double,alt:Double,vlc:Double,sl:Integer,ds:Integer,dir_y:Double,poi_azimuth_x:Double,poi_azimuth_y:Double";
String sftName = "SignalBuilder";
ds.createSchema(SimpleFeatureTypes.createType(sftName, spec));
// reload the sft from the ds to ensure all user data is set properly
sft = ds.getSchema(sftName);
BroadcastService mock = mock(BroadcastService.class);
Config config = Config.create(ConfigSources.create(
Map.of("query.period", "2592000", "query.maxCoordinatesWithoutSampling", "1000",
"query.maxCameraCount", "50")));
sut = new GeomesaServiceCachedImpl(ds, mock, config);
VsvnEvent event = new VsvnEvent("cam", 1234567L, new Latitude(57.33, LatType.North),
new Longitude(33.22, LonType.East), "imei", 13.0,
new Azimuth(123.0, null), null, null);
ArrayList<GeoEvent> eventList = new ArrayList<>(1);
eventList.add(event);
sut.writeData(eventList);
Option<Object> count = ds.stats().getCount(sft, ECQL.toFilter("cam='cam'"), false, new Hints());
Assertions.assertFalse(count.isEmpty());
assertEquals(1L, count.get());
}
@Test
public void successTestWithoutUsingSutWithGenerationStringSchema()
throws IOException, CQLException {
String spec = "*geo:Point,time:Date:index=true,cam:String:keep-stats=true:index=true,imei:String,dir:Double,alt:Double,vlc:Double,sl:Integer,ds:Integer,dir_y:Double,poi_azimuth_x:Double,poi_azimuth_y:Double";
String sftName = "testSchema";
ds.createSchema(SimpleFeatureTypes.createType(sftName, spec));
// reload the sft from the ds to ensure all user data is set properly
sft = ds.getSchema(sftName);
VsvnEvent event = new VsvnEvent("cam", 1234567L, new Latitude(57.33, LatType.North),
new Longitude(33.22, LonType.East), "imei", 13.0,
new Azimuth(123.0, null), null, null);
SimpleFeatureWriter writer = ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT);
SimpleFeature sf = writer.next();
long time =
event.getTime() > (long) Integer.MAX_VALUE ? event.getTime() : event.getTime() * 1000L;
String EventID = event.getCam() + "-" + time /*event.time*/;
// if altitude > 0, add altitude to coordinate ?
Point point = GEOMETRY_FACTORY.createPoint(
new Coordinate(event.getLon().getValue(), event.getLat().getValue())); // Point(X, Y)
SimpleFeature f = new SimpleFeatureBuilder(TYPE).buildFeature(EventID); // SimpleFeature
sf.setAttribute("geo", point);
sf.setAttribute("time", new java.util.Date(time)); // (event.time));
sf.setAttribute("cam", event.getCam());
sf.setAttribute("imei", event.getImei());
sf.setAttribute("dir", event.getDir());
sf.setAttribute("alt", event.getAlt());
sf.setAttribute("vlc", event.getVlc());
sf.setAttribute("sl", event.getSl());
sf.setAttribute("ds", event.getDs().toInt());
sf.setAttribute("dir_y", event instanceof VsvnEvent ? ((VsvnEvent) event).getDirV() : null);
sf.setAttribute("poi_azimuth_x",
event instanceof VsvnEvent ? ((VsvnEvent) event).getPoi() : null);
sf.setAttribute("poi_azimuth_y",
event instanceof VsvnEvent ? ((VsvnEvent) event).getPoiV() : null);
writer.write();
writer.close();
Option<Object> count = ds.stats().getCount(sft, ECQL.toFilter("cam='cam'"), false, new Hints());
Assertions.assertFalse(count.isEmpty());
assertEquals(1L, count.get());
}
@Test
public void successTestForDummySchemaWithGenerationStringSchema()
throws IOException, CQLException {
String spec = "name:String:index=join,age:Int:keep-stats=true,height:Int,dtg:Date,*geom:Point:srid=4326";
String sftName = "testSchema";
ds.createSchema(SimpleFeatureTypes.createType(sftName, spec));
sft = ds.getSchema(
sftName);// reload the sft from the ds to ensure all user data is set properly
SimpleFeatureWriter writer = ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT);
SimpleFeature sf = writer.next();
sf.setAttribute(0, "beta");
sf.setAttribute(1, 11);
sf.setAttribute(2, 11);
sf.setAttribute(3, "2016-01-04T00:00:00.000Z");
sf.setAttribute(4, "POINT (0 0)");
writer.write();
writer.close();
Option<Object> count = ds.stats().getCount(sft, ECQL.toFilter("name='beta'"), false,
new Hints());
Assertions.assertFalse(count.isEmpty());
assertEquals(1L, count.get());
}
@Test
public void successBigTestWith1000DataBy3Cameras() throws CQLException {
ds.createSchema(TYPE);
sft = ds.getSchema(TYPE.getTypeName());
BroadcastService mock = mock(BroadcastService.class);
Config config = Config.create(ConfigSources.create(
Map.of("query.period", "2592000", "query.maxCoordinatesWithoutSampling", "1000",
"query.maxCameraCount", "50")));
sut = new GeomesaServiceCachedImpl(ds, mock, config);
VsvnEvent event;
for (int j = 1; j < 4; j++) {
ArrayList<GeoEvent> geoEvents = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
event = new VsvnEvent("cam" + j, 1234567L, new Latitude(10.33 + 0.05 * i, LatType.North),
new Longitude(9.22 + 0.01 * i, LonType.East), "imei" + j, 13.0,
new Azimuth(123.0, null), null, null);
geoEvents.add(event);
if (i==400 || i==560) {
sut.writeData(geoEvents);
geoEvents.clear();
}
}
sut.writeData(geoEvents);
}
Option<Object> countByCam1 = ds.stats().getCount(sft, ECQL.toFilter("cam='cam1'"), false,
new Hints());
Assertions.assertFalse(countByCam1.isEmpty());
assertEquals(1000L, countByCam1.get());
Option<Object> countByCam2 = ds.stats().getCount(sft, ECQL.toFilter("cam='cam2'"), false,
new Hints());
Assertions.assertFalse(countByCam2.isEmpty());
assertEquals(1000L, countByCam2.get());
Option<Object> countByCam3 = ds.stats().getCount(sft, ECQL.toFilter("cam='cam3'"), false,
new Hints());
Assertions.assertFalse(countByCam3.isEmpty());
assertEquals(1000L, countByCam3.get());
Option<EnumerationStat<Object>> cam = ds.stats().getEnumeration(sft, "cam",
ECQL.toFilter("1=1"), true);
scala.collection.mutable.Map<Object, Object> enumeration = cam.get().enumeration();
Map<Object, Object> camCount = JavaConverters.mapAsJavaMap(enumeration);
assertEquals(3, camCount.size());
Option<Object> countByBboxEstimated = ds.stats().getCount(sft, ECQL.toFilter(
"bbox(geo,10,50,19,60)"), false, new Hints());
Option<Object> countByBboxCalculated = ds.stats().getCount(sft, ECQL.toFilter(
"bbox(geo,10,50,19,60)"), true, new Hints());
log.info("Estimated count by bbox = {}", countByBboxEstimated.get());
log.info("Presize count by bbox = {}", countByBboxCalculated.get());
}
}
and how I run cluster
package ru.company.gps.stats;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.locationtech.geomesa.accumulo.AccumuloVersion;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
class MiniCluster {
public final String namespace = "myNamespace";
public final String user = "root";
public final String password = "secret";
public MiniCluster(){}
public MiniAccumuloCluster startCluster()
throws IOException, AccumuloException, AccumuloSecurityException, InterruptedException, NamespaceExistsException {
Path miniClusterTempDir = Files.createTempDirectory("myNamespace-mini-acc-");
MiniAccumuloConfig config = new MiniAccumuloConfig(miniClusterTempDir.toFile(),password);
MiniAccumuloCluster cluster = new MiniAccumuloCluster(config);
try (FileWriter writer = new FileWriter(new File(miniClusterTempDir.toFile(), "conf/zoo.cfg"),true)) {
writer.write("admin.enabledServer=false\n");
writer.write("4lw.commands.whitelist=*\n");
}
cluster.start();
Connector connector = cluster.getConnector(user, password);
connector.namespaceOperations().create(namespace);
connector.securityOperations().grantSystemPermission(user, SystemPermission.CREATE_NAMESPACE);
connector.securityOperations().grantSystemPermission(user, SystemPermission.ALTER_NAMESPACE);
connector.securityOperations().grantSystemPermission(user, SystemPermission.DROP_NAMESPACE);
connector.securityOperations().grantNamespacePermission(user, namespace, NamespacePermission.READ);
connector.securityOperations().grantNamespacePermission(user, namespace, NamespacePermission.WRITE);
connector.securityOperations().grantNamespacePermission(user, namespace, NamespacePermission.CREATE_TABLE);
connector.securityOperations().grantNamespacePermission(user, namespace, NamespacePermission.ALTER_TABLE);
connector.securityOperations().grantNamespacePermission(user, namespace, NamespacePermission.DROP_TABLE);
AccumuloVersion.close(connector);
return cluster;
}
}
Then I thought that the problem not in my code but in the db.
Today I installed singlenode accumulo with zookeeper and hadoop on local VMWARE machine. (In docker I couldn't run image with accumulo, accumulo not starting)
But with geomesa system install this does not worked
/opt/accumulo/lib/ext/geomesa-accumulo-distributed-runtime_2.12-3.2.2.jar
and I add to accumulo classpath the path in accumulo.properties to this jar
in /opt/accumulo/conf/accumulo-env.sh
CLASSPATH="${CLASSPATH}:${lib}/*:${HADOOP_CONF_DIR}:${ZOOKEEPER_HOME}/*:${ZOOKEEPER_HOME}/lib/*:${HADOOP_HOME}/share/hadoop/client/*:${CLASSPATH}:${lib}/ext/*
I restarted the accumulo cluster but It does not help.
My app can't start.
After this I installed geomesa library jar like
$ hadoop fs -mkdir /accumulo/classpath/myNamespace
$ hadoop fs -put \
geomesa-accumulo_${VERSION}/dist/accumulo/geomesa-accumulo-distributed-runtime_${VERSION}.jar \
/accumulo/classpath/myNamespace/
and then
$ accumulo shell -u root
> createnamespace myNamespace
> grant NameSpace.CREATE_TABLE -ns myNamespace -u myUser
> config -s general.vfs.context.classpath.myNamespace=hdfs://NAME_NODE_FDQN:9000/accumulo/classpath/myNamespace/[^.].*.jar
> config -ns myNamespace -s table.classpath.context=myNamespace
And after this My app can run and start. But the result is the same. The statistics not correctly counted for device.
I thought that maybe the problem is that
The app geomesa version was 3.2.0,
in accumulo I put library with version 3.2.2
And geomesa-accumulo utils have version 3.2.2. With it I count the size of geoevents by cam.
I upgraded myapp geomesa version to 3.2.2, but it does not help.
I think maybe the problem with my datasource params to accumulo?
Maybe I need params that handled accumulo connection params, like threads count or maybe buffer size?
I use next
Properties params = new Properties();
params.put("
accumulo.instance.id", "accumulo");
params.put("accumulo.zookeepers", "
192.168.81.130:2181");
params.put("accumulo.catalog", "myNamespace.geomesa");
params.put("accumulo.user", "root");
params.put("accumulo.password", "password");
params.put("geomesa.stats.generate",true);
params.put("accumulo.remote.stats.enable",true);
params.put("geomesa.query.audit", true);
params.put("geomesa.query.timeout", true);
params.put("geomesa.query.caching", true);
params.put("geomesa.partition.scan.parallel", true);
Iterator<DataStoreFactorySpi> stores = DataStoreFinder.getAvailableDataStores();
while (stores.hasNext()) {
DataStoreFactorySpi spi = stores.next();
log.info(
"DataStoreFactorySpi Name: " + spi.getDisplayName() + ", Descr: " + spi.getDescription());
}
DataStore ds = DataStoreFinder.getDataStore(params);
ds.createSchema(SimpleFeatureUtils.TYPE);
return ds;