package kafka.sec01;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemoGroup {
    public static void main(String[] args) {
        Logger logger= LoggerFactory.getLogger(ConsumerDemoGroup.class);
        String bootstrapServer="127.0.0.1:9092";
        String groupId="second_group";
        Properties properties=new Properties();
        String topic="second_topic";

        //create cosumer config
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        //create consumer
        KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(properties);

        //subscribe to producer
        consumer.subscribe(Arrays.asList(topic));

        //poll for new data;
        while (true){
         ConsumerRecords<String,String> record= consumer.poll(Duration.ofMillis(1000));

         for (ConsumerRecord record1:record){
             System.out.println("Key: "+record1.key()+"\tVal: "+record1.value());
             System.out.println("Partition: "+record1.partition()+"\tOffset: "+record1.offset());
         }
        }

    }

}