package geomesa.sensor;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import geomesa.core.index.Constants;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.DataUtilities;
import org.geotools.feature.SchemaException;
import org.opengis.feature.simple.SimpleFeatureType;

import java.util.*;

public class SensorIngest {

    static String INSTANCE_ID = "instanceId";
    static String ZOOKEEPERS = "zookeepers";
    static String USER = "user";
    static String PASSWORD = "password";
    static String AUTHS = "auths";
    static String TABLE_NAME = "tableName";
    static String FEATURE_NAME = "featureName";
    static String INGEST_FILE = "ingestFile";
    static String[] ACCUMULO_CONNECTION_PARAMS = new String[]{INSTANCE_ID,
            ZOOKEEPERS, USER, PASSWORD, AUTHS, TABLE_NAME};

    static Options getCommonRequiredOptions() {
        Options options = new Options();
        Option instanceIdOpt = OptionBuilder.withArgName(INSTANCE_ID)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("accumulo connection parameter instanceId")
                                         .create(INSTANCE_ID);
        Option zookeepersOpt = OptionBuilder.withArgName(ZOOKEEPERS)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("accumulo connection parameter zookeepers")
                                         .create(ZOOKEEPERS);
        Option userOpt = OptionBuilder.withArgName(USER)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("accumulo connection parameter user")
                                         .create(USER);
        Option passwordOpt = OptionBuilder.withArgName(PASSWORD)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("accumulo connection parameter password")
                                         .create(PASSWORD);
        Option authsOpt = OptionBuilder.withArgName(AUTHS)
                                         .hasArg()
                                         .withDescription("accumulo connection parameter auths")
                                         .create(AUTHS);
        Option tableNameOpt = OptionBuilder.withArgName(TABLE_NAME)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("accumulo connection parameter tableName")
                                         .create(TABLE_NAME);
        Option featureNameOpt = OptionBuilder.withArgName(FEATURE_NAME)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("name of feature in accumulo table")
                                         .create(FEATURE_NAME);
        options.addOption(instanceIdOpt);
        options.addOption(zookeepersOpt);
        options.addOption(userOpt);
        options.addOption(passwordOpt);
        options.addOption(authsOpt);
        options.addOption(tableNameOpt);
        options.addOption(featureNameOpt);
        return options;
    }

    public static Map<String, String> getAccumuloDataStoreConf(CommandLine cmd) {
        Map<String , String> dsConf = new HashMap<String , String>();
        for (String param : ACCUMULO_CONNECTION_PARAMS) {
            dsConf.put(param, cmd.getOptionValue(param));
        }
        if (dsConf.get(AUTHS) == null) dsConf.put(AUTHS, "");
        return dsConf;
    }

	private static void _(String s) {
		System.out.println("("+s+")");
	}

    public static void main(String [ ] args) throws Exception {
		_("SensorIngest.main@010 ...");
        CommandLineParser parser = new BasicParser();
        Options options = getCommonRequiredOptions();
        Option ingestFileOpt = OptionBuilder.withArgName(INGEST_FILE)
                                         .hasArg()
                                         .isRequired()
                                         .withDescription("ingest tsv file on hdfs")
                                         .create(INGEST_FILE);
        options.addOption(ingestFileOpt);

        CommandLine cmd = parser.parse( options, args);
        Map<String, String> dsConf = getAccumuloDataStoreConf(cmd);

        String featureName = cmd.getOptionValue(FEATURE_NAME); // e.g. "sensor"
        SimpleFeatureType featureType = buildFeatureType(featureName);

        DataStore ds = DataStoreFinder.getDataStore(dsConf);
        ds.createSchema(featureType);

        runMapReduceJob(featureName, 
			"GeoMesa SensorEvent Ingest", 
			SensorIngestMapper.class,
            dsConf,
            new Path(cmd.getOptionValue(INGEST_FILE)));
		_("SensorIngest.main@090 SUCCESS...");
    }

    private static void runMapReduceJob(String featureName, 
										String jobName, Class mapper,
                                        Map<String, String> dsConf,
                                        Path mapredCSVFilePath) throws Exception {

		_("SensorIngest.runMapReduceJob@010 ...");
        Job job = Job.getInstance(new Configuration());
        job.setMapperClass(mapper); // e.g., SensorIngestMapper.class
        job.setOutputFormatClass(AccumuloFileOutputFormat.class);
        job.setJobName(jobName);
        Configuration conf = job.getConfiguration();
        for (String key : dsConf.keySet()) {
            conf.set(key, dsConf.get(key));
        }
        conf.set(FEATURE_NAME, featureName);
        FileSystem fs = FileSystem.get(conf);
        FileInputFormat.setInputPaths(job, mapredCSVFilePath);
        Path tmpPath = new Path("///tmp");
        if (!fs.exists(tmpPath)) {
            fs.create(tmpPath);
        }
        Path outputDir = new Path("///tmp", "geomesa-"+featureName+"-output"); 
        if (fs.exists(outputDir)) {
          // remove this directory, if it already exists
            fs.delete(outputDir, true);
        }
        Path hdfsJarPath = new Path("///tmp", "geomesa-"+featureName+"-1.0-SNAPSHOT.jar");
        if (fs.exists(hdfsJarPath)) {
          // remove this jar, if it already exists
            fs.delete(hdfsJarPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputDir);
        fs.copyFromLocalFile(new Path("target/geomesa-"+featureName+"-1.0-SNAPSHOT.jar"), hdfsJarPath);
        for (FileStatus path : fs.listStatus(hdfsJarPath)) {
            job.addArchiveToClassPath(new Path(path.getPath().toUri().getPath()));
        }

        job.submit();

        if (!job.waitForCompletion(true)) {
            throw new Exception("Job failed");
        }

		_("SensorIngest.runMapReduceJob@090 return ...");
    }


    private static SimpleFeatureType buildFeatureType(String featureName) throws SchemaException {
        String name = featureName;
        String spec = Joiner.on(",").join(attributes);
		_("SimpleFeatureType@010: name='"+name+"' spec=["+spec+"]");
		SimpleFeatureType featureType = DataUtilities.createType(name, spec);
		featureType.getUserData().put(Constants.SF_PROPERTY_START_TIME, "Time"); // Tells GeoMesa to use "Time" as the Start Time index
        return featureType;
    }

    static List<String> attributes = Lists.newArrayList(
		"EventID:Integer",
		"Sensor:Integer",
		"Time:Date",			// Date => a "Unix" time (e.g., millis since 1970.01.01)
		"SensorGeo_Lat:Float",
		"SensorGeo_Long:Float",
		"Altitude:Integer",
		"MinAxis:Float",
		"MajAxis:Float",
		"Orientation:Float",
		"*geom:Point:srid=4326" // SRID 4326 => standard coordinate system
	);
}

