package geomesa.sensor;

import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.filter.identity.FeatureIdImpl;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;

import java.io.IOException;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class SensorIngestMapper extends Mapper<LongWritable,Text,Key,Value> {

	// Columns: EventID, Sensor, Time, Latitude, Longitude, Altitude, MinAxis, MajAxis, Orientation
    private static int ID_COL_IDX = 0;
    private static int TIME_COL_IDX = 2;
    private static int LATITUDE_COL_IDX  = 3;
    private static int LONGITUDE_COL_IDX = 4;
    private static int MINIMUM_NUM_FIELDS = 9;

    private SimpleFeatureType 								featureType = null;
    private FeatureWriter<SimpleFeatureType, SimpleFeature> featureWriter = null;
    private SimpleFeatureBuilder							featureBuilder;
    private GeometryFactory 								geometryFactory = JTSFactoryFinder.getGeometryFactory();

	private static void _(String s) {
		System.out.println("("+s+")");
	}

    public void setup(Mapper<LongWritable,Text,Key,Value>.Context context) throws IOException, InterruptedException {
        super.setup(context);
		_("SensorIngestMapper.setup: ENTER...");

        Map<String , String> connectionParams = new HashMap<String , String>();
        connectionParams.put("instanceId", context.getConfiguration().get("instanceId"));
        connectionParams.put("zookeepers", context.getConfiguration().get("zookeepers"));
        connectionParams.put("user", context.getConfiguration().get("user"));
        connectionParams.put("password", context.getConfiguration().get("password"));
        connectionParams.put("auths", context.getConfiguration().get("auths"));
        connectionParams.put("tableName", context.getConfiguration().get("tableName"));

        String featureName = context.getConfiguration().get("featureName");
		_("SensorIngestMapper.setup: featureName="+featureName);
        DataStore ds = DataStoreFinder.getDataStore(connectionParams);
        featureType = ds.getSchema(featureName);
		_("SensorIngestMapper.setup: featureType="+featureType.getTypeName());
        featureBuilder = new SimpleFeatureBuilder(featureType);
		_("SensorIngestMapper.setup: featureBuilder OK...");
        featureWriter = ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT); 
		_("SensorIngestMapper.setup: RETURN.");
    }

    public void map(LongWritable key, Text value, Mapper<LongWritable,Text,Key,Value>.Context context) {
		_("SensorIngestMapper.map: ENTER key='"+key+"' value=["+value.toString()+"] ...");
        String[] attributes = value.toString().split("\\t", -1);
        if (attributes.length >= MINIMUM_NUM_FIELDS && !attributes[LATITUDE_COL_IDX].equals("") && !attributes[LONGITUDE_COL_IDX].equals("")) {
            featureBuilder.reset();
            featureBuilder.addAll(attributes);

			//_("SensorIngestMapper.map@020: parsing lat = "+attributes[LATITUDE_COL_IDX]+" ...");
            Double lat = Double.parseDouble(attributes[LATITUDE_COL_IDX]);
			//_("SensorIngestMapper.map@030: parsing lon = "+attributes[LONGITUDE_COL_IDX]+" ...");
            Double lon = Double.parseDouble(attributes[LONGITUDE_COL_IDX]);

            Geometry geom = geometryFactory.createPoint(new Coordinate(lon, lat));
            SimpleFeature simpleFeature = featureBuilder.buildFeature(attributes[ID_COL_IDX]);
			Date time = new Date( Long.parseLong(attributes[TIME_COL_IDX]) );
			_("SensorIngestMapper.map@040: parsed time '"+attributes[TIME_COL_IDX]+"': "+time);
            simpleFeature.setAttribute("Time", time);
            simpleFeature.setDefaultGeometry(geom);

            try {
                SimpleFeature next = featureWriter.next();
                for (int i = 0; i < simpleFeature.getAttributeCount(); i++) {
                    next.setAttribute(i, simpleFeature.getAttribute(i));
                }
                ((FeatureIdImpl)next.getIdentifier()).setID(simpleFeature.getID());
                featureWriter.write();
					_("SensorIngestMapper.map: feature written ...");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
		_("SensorIngestMapper.map: RETURN ...");
    }
}
