package com.simple.sumit.kafka;

import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

@Slf4j
public class ElasticSearchConsumerBulkInsert {
    public static void main(String[] args) throws IOException, InterruptedException {
        log.info("starting main function....");
        String topic="twitter_tweets";
        RestHighLevelClient client= createClient();

        KafkaConsumer<String,String> consumer= createConsumer(topic);
        BulkRequest  bulkRequest=new BulkRequest();
        //poll for new data;
        while (true){
            ConsumerRecords<String,String> record= consumer.poll(Duration.ofMillis(100));
            log.info("Received \t "+record.count()+" record");
            int recCount=record.count();
            for (ConsumerRecord<String,String> record1:record){
                //here we insert data into elastic search.
                try{
                    String id= extrctIdFromTweet(record1.value());

                    IndexRequest indexRequest= new IndexRequest(
                            "twitter",
                            "tweets",
                            id //this is to make our consumer to  idempotent
                    ).source(record1.value(), XContentType.JSON);
                    bulkRequest.add(indexRequest);
                }catch (NullPointerException ex){
                    log.warn("skipping bad data:\t"+record1.value());
                }
            }
            if(recCount>0){
                try {
                    BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                }catch (Exception e){
                    log.error(e.getMessage());
                }
                log.info("Committing the offset");
                consumer.commitAsync();
                log.info("Offset committed");
            }

        }
       // client.close();


    }
    public static RestHighLevelClient createClient(){
        //https://w3i4gt895z:q375xgades@kafka-project-4013611778.us-east-1.bonsaisearch.net:443
        String hostname="kafka-project-4013611778.us-east-1.bonsaisearch.net";
        String username="w3i4gt895z";
        String password="q375xgades";

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
        RestClientBuilder builder= RestClient.builder(new HttpHost(hostname,443,"https"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        RestHighLevelClient client=new RestHighLevelClient(builder);
        return client;

    }
    public static KafkaConsumer<String,String> createConsumer(String topic){
        String bootstrapServer="127.0.0.1:9092";
        String groupId="second_group";
        Properties properties=new Properties();

        //create consumer config
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//disable auto commit offset
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");//

        //create consumer
        KafkaConsumer<String,String> consumer= new KafkaConsumer<>(properties);

        //subscribe to producer
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }
    private static JsonParser jsonParser = new JsonParser();
    public static String extrctIdFromTweet(String tweet){
                try{
                    return jsonParser.parse(tweet)
                .getAsJsonObject()
                .get("id_str")
                .getAsString();
                } catch (Exception ex){
                    return tweet;
                }
    }

}