Commit 0fd2f1dd authored by Lokesh Singh's avatar Lokesh Singh

getting live matches details

parent d0a0d44e
...@@ -9,4 +9,6 @@ ...@@ -9,4 +9,6 @@
"homeScore": 1, "homeScore": 1,
"awayScore": 1 "awayScore": 1
}' }'
``` ```
\ No newline at end of file 10 April
Task: Getting live matches details from rapid api live score. publishing live matches details in kafka topic and consuming it using server sent event.
\ No newline at end of file
...@@ -5,25 +5,17 @@ import org.apache.kafka.clients.producer.ProducerRecord; ...@@ -5,25 +5,17 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.io.IOException; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@Configuration @Configuration
public class AsyncHttpClientConfig { public class AsyncHttpClientConfig {
...@@ -49,14 +41,13 @@ public class AsyncHttpClientConfig { ...@@ -49,14 +41,13 @@ public class AsyncHttpClientConfig {
kafkaSender.close(); kafkaSender.close();
} }
@Bean @Bean
public void sendNewsInKafka() throws IOException { public void sendLiveMatchesInKafka() throws IOException {
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(); AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
asyncHttpClient.prepare("GET", "https://livescore6.p.rapidapi.com/matches/v2/list-live?Category=soccer&Timezone=-7") asyncHttpClient.prepare("GET", "https://livescore6.p.rapidapi.com/matches/v2/list-live?Category=soccer&Timezone=-7")
.setHeader("X-RapidAPI-Key", "0b715b060amsh3056d92625990d9p1fe679jsn29ba530ad93c") .setHeader("X-RapidAPI-Key", "0b715b060amsh3056d92625990d9p1fe679jsn29ba530ad93c")
.setHeader("X-RapidAPI-Host", "livescore6.p.rapidapi.com") .setHeader("X-RapidAPI-Host", "livescore6.p.rapidapi.com")
.execute() .execute()
.toCompletableFuture() .toCompletableFuture()
// .thenAccept(System.out::println)
.thenAccept(response -> { .thenAccept(response -> {
String body = response.getResponseBody(); String body = response.getResponseBody();
System.out.println(body); System.out.println(body);
...@@ -79,10 +70,10 @@ public class AsyncHttpClientConfig { ...@@ -79,10 +70,10 @@ public class AsyncHttpClientConfig {
asyncHttpClient.close(); asyncHttpClient.close();
} }
//
public static void main(String[] args) throws IOException { // public static void main(String[] args) throws IOException {
AsyncHttpClientConfig ob = new AsyncHttpClientConfig(BOOTSTRAP_SERVERS); // AsyncHttpClientConfig ob = new AsyncHttpClientConfig(BOOTSTRAP_SERVERS);
ob.sendNewsInKafka(); // ob.sendLiveMatchesInKafka();
//
} // }
} }
package com.lokesh.sse.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
@RestController
@RequestMapping("/api/v2")
public class LiveMatchController {
@Autowired
KafkaReceiver<String,String> kafkaReceiver;
@GetMapping(value = "/live-matches", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux getLiveMatchEventsFlux(){
Flux<ReceiverRecord<String,String>> kafkaFlux = kafkaReceiver.receive();
return kafkaFlux.checkpoint("Messages are started being consumed")
.log()
.doOnNext(r -> r.receiverOffset().acknowledge())
.map(ReceiverRecord::value).checkpoint("Messages are done consumed");
}
}
package com.lokesh.sse.controller; package com.lokesh.sse.controller;
import com.lokesh.sse.configuration.AsyncHttpClientConfig;
import com.lokesh.sse.configuration.WebClientConfiguration;
import com.lokesh.sse.model.LiveScore; import com.lokesh.sse.model.LiveScore;
import com.lokesh.sse.service.LiveScoreHandler; import com.lokesh.sse.service.LiveScoreHandler;
import org.asynchttpclient.request.body.generator.FeedListener; import org.asynchttpclient.request.body.generator.FeedListener;
...@@ -27,8 +25,6 @@ public class LiveScoreController { ...@@ -27,8 +25,6 @@ public class LiveScoreController {
private final LiveScoreHandler processor; private final LiveScoreHandler processor;
private AsyncHttpClientConfig asyncHttpClientConfig;
public LiveScoreController(LiveScoreHandler processor) { public LiveScoreController(LiveScoreHandler processor) {
this.processor = processor; this.processor = processor;
} }
...@@ -52,8 +48,4 @@ public class LiveScoreController { ...@@ -52,8 +48,4 @@ public class LiveScoreController {
.retry(Duration.ofMillis(200)) .retry(Duration.ofMillis(200))
.build()); .build());
} }
// @GetMapping("/news")
// public Flux<ServerSentEvent<Object>> getNews() {
// return Flux.create();
// }
} }
package com.lokesh.sse.webclient; package com.lokesh.sse.webclient;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import java.io.IOException;
public class LiveScoreClient { public class LiveScoreClient {
public static void main(String[] args) throws IOException {
// LiveScoreClient ob = new LiveScoreClient();
// System.out.println(ob.response);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment