Commit d0a0d44e authored by Lokesh Singh's avatar Lokesh Singh

sending live matches topic into kafka

parent 82500a85
......@@ -21,7 +21,15 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......@@ -37,6 +45,15 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.pepegar</groupId>
<artifactId>hammock-asynchttpclient_2.13</artifactId>
<version>0.11.3</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -4,8 +4,8 @@
curl --location --request POST 'http://localhost:8080/api/v1/live-scores' \
--header 'Content-Type: application/json' \
--data-raw '{
"homeTeam": "Arsenal",
"awayTeam": "Tottenham",
"homeTeam": "Lokesh",
"awayTeam": "Netrapal",
"homeScore": 1,
"awayScore": 1
}'
......
package com.lokesh.sse.configuration;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@Configuration
public class AsyncHttpClientConfig {
private static final String TOPIC = "live-matches";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String CLIENT_ID_CONFIG = "news-consumer-rapidapi";
@Autowired
private KafkaSender<String, String> kafkaSender;
public AsyncHttpClientConfig(String bootstrapServers){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
kafkaSender = KafkaSender.create(senderOptions);
}
public void close() {
kafkaSender.close();
}
@Bean
public void sendNewsInKafka() throws IOException {
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
asyncHttpClient.prepare("GET", "https://livescore6.p.rapidapi.com/matches/v2/list-live?Category=soccer&Timezone=-7")
.setHeader("X-RapidAPI-Key", "0b715b060amsh3056d92625990d9p1fe679jsn29ba530ad93c")
.setHeader("X-RapidAPI-Host", "livescore6.p.rapidapi.com")
.execute()
.toCompletableFuture()
// .thenAccept(System.out::println)
.thenAccept(response -> {
String body = response.getResponseBody();
System.out.println(body);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, null, body);
SenderRecord<String, String, ProducerRecord<String, String>> senderRecord = SenderRecord.create(record, null);
kafkaSender.send(Flux.just(senderRecord))
.doOnNext(result -> {
if (result.exception() != null) {
// Handle sending failure
System.err.println("Failed to send message: " + result.exception().getMessage());
} else {
// Handle sending success
System.out.println("Message sent successfully: " + result.correlationMetadata());
}
})
.subscribe();
})
.thenAccept(System.out::println)
.join();
asyncHttpClient.close();
}
public static void main(String[] args) throws IOException {
AsyncHttpClientConfig ob = new AsyncHttpClientConfig(BOOTSTRAP_SERVERS);
ob.sendNewsInKafka();
}
}
package com.lokesh.sse.configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ServerSentEventConfig {
private static final String TOPIC = "live-matches";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String CLIENT_ID_CONFIG = "news-consumer-rapidapi";
private static final String GROUP_ID_CONFIG = "news-rapidapi";
@Bean
public KafkaReceiver kafkaReceiver(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(props).subscription(Collections.singleton(TOPIC)));
}
}
package com.lokesh.sse.configuration;
import org.apache.commons.logging.LogFactory;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.client.WebClient;
import java.io.IOException;
@Configuration
@EnableWebFlux
public class WebClientConfiguration implements WebFluxConfigurer {
Logger logger = LoggerFactory.getLogger(WebClientConfiguration.class);
}
package com.lokesh.sse.controller;
import com.lokesh.sse.configuration.AsyncHttpClientConfig;
import com.lokesh.sse.configuration.WebClientConfiguration;
import com.lokesh.sse.model.LiveScore;
import com.lokesh.sse.service.LiveScoreHandler;
import org.asynchttpclient.request.body.generator.FeedListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
@RestController
......@@ -21,6 +27,8 @@ public class LiveScoreController {
private final LiveScoreHandler processor;
private AsyncHttpClientConfig asyncHttpClientConfig;
public LiveScoreController(LiveScoreHandler processor) {
this.processor = processor;
}
......@@ -44,4 +52,8 @@ public class LiveScoreController {
.retry(Duration.ofMillis(200))
.build());
}
// @GetMapping("/news")
// public Flux<ServerSentEvent<Object>> getNews() {
// return Flux.create();
// }
}
package com.lokesh.sse.webclient;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import java.io.IOException;
public class LiveScoreClient {
public static void main(String[] args) throws IOException {
// LiveScoreClient ob = new LiveScoreClient();
// System.out.println(ob.response);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment