Commit 13e15b97 authored by lmethuku's avatar lmethuku

kafka producer consumer

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file

java:S6813"BRemove this field injection and use constructor injection instead.(8ԝ2J$7ef0d711-d7da-4fe5-9e8d-d73d9aad8032
m java:S106"+Replace this use of System.out by a logger.(8ԝ2J$f755dda8-c15f-4f6f-8110-03e349ee1020
l java:S106"+Replace this use of System.out by a logger.(c8ԝ2J$25d394ca-86b2-4491-bb59-a484c044d4fd
r java:S106"+Replace this use of System.err by a logger.(Ӝ8ԝ2J$652c9130-a14c-4af1-b3ae-444440eaddd9
\ No newline at end of file
…
java:S6813"BRemove this field injection and use constructor injection instead.(êæÄÞ8¬ä¯Ú2J$76851c00-4eac-4fea-82ad-ca87e97d4d66
…
java:S6813"BRemove this field injection and use constructor injection instead.(êæÄÞ8¬ä¯Ú2J$4a7a8a3d-02e0-4538-a63c-1453fe1a7ed9
o
java:S1602%",Remove useless curly braces around statement(óÑç•8±ä¯Ú2J$fab18bfa-71e3-4d6d-92f8-5ae9164f94f1
\ No newline at end of file
m java:S106"+Replace this use of System.out by a logger.(͊8۳2J$273c2cea-a265-46bf-9bc3-f5fd23b7f34f
\ No newline at end of file
r java:S106"+Replace this use of System.out by a logger.(蕯8ě֝2J$9b0434ac-9d8c-4444-a457-6b44683f6008
\ No newline at end of file
…
java:S6813"BRemove this field injection and use constructor injection instead.(êæÄÞ8›ÔªÚ2J$7be9cde3-cf44-4b99-af2f-ed08cea8b6b8
\ No newline at end of file
\
,src/main/java/com/visa/application/Main.java,e\4\e4a8b1d377a4b327b700c82cdc170510bab01518
7
pom.xml,4\4\442292b8a7efeabbe4cc176709b833b1792140ec
t
Dsrc/main/java/com/visa/application/service/KafkaConsumerService.java,6\9\6942edf71f4d5bbfe3de41146a3a098df035c3a4
r
Bsrc/main/java/com/visa/application/repository/OrderRepository.java,6\c\6c3a36cb85925918c697fcb755d30e67c9d5d86c
d
4src/main/java/com/visa/application/entity/Order.java,1\7\17b747299bcdd044deceeea2c1f1ccbe46a3dedd
e
5src/main/java/com/visa/application/dto/OrderItem.java,2\8\28b7fb56b2d55850d9cad5da62dd392a0c5fc207
r
Bsrc/main/java/com/visa/application/config/KafkaProducerConfig.java,0\0\008a573ba4edb27f2673e3ea2cb3e9f7255b4f5b
t
Dsrc/main/java/com/visa/application/service/OrderProducerService.java,1\0\100a5b4d1da9838c1688d6b849bdeebb9da4029b
r
Bsrc/main/java/com/visa/application/controller/OrderController.java,e\6\e69564226f562755acddd83c769bac074da24d2c
r
Bsrc/main/java/com/visa/application/config/KafkaConsumerConfig.java,e\4\e42aed864c9d081962e94e9429be16769724d18d
t
Dsrc/main/java/com/visa/application/service/OrderConsumerService.java,0\6\06f0fb69e91138311e9ba8c0ea7dff81d4ae83b9
m
=src/main/java/com/visa/application/KafkaProducerConsumer.java,4\2\425086460f19c2fbe55b0895a151e8d08ce195e3
l
<src/main/java/com/visa/application/NonRepeatedCharacter.java,a\f\afc5553deb5701ec3d8402c163b9aefae268965a
\ No newline at end of file
\
,src/main/java/com/visa/application/Main.java,e\4\e4a8b1d377a4b327b700c82cdc170510bab01518
7
pom.xml,4\4\442292b8a7efeabbe4cc176709b833b1792140ec
t
Dsrc/main/java/com/visa/application/service/KafkaConsumerService.java,6\9\6942edf71f4d5bbfe3de41146a3a098df035c3a4
r
Bsrc/main/java/com/visa/application/repository/OrderRepository.java,6\c\6c3a36cb85925918c697fcb755d30e67c9d5d86c
d
4src/main/java/com/visa/application/entity/Order.java,1\7\17b747299bcdd044deceeea2c1f1ccbe46a3dedd
e
5src/main/java/com/visa/application/dto/OrderItem.java,2\8\28b7fb56b2d55850d9cad5da62dd392a0c5fc207
r
Bsrc/main/java/com/visa/application/config/KafkaProducerConfig.java,0\0\008a573ba4edb27f2673e3ea2cb3e9f7255b4f5b
t
Dsrc/main/java/com/visa/application/service/OrderProducerService.java,1\0\100a5b4d1da9838c1688d6b849bdeebb9da4029b
r
Bsrc/main/java/com/visa/application/controller/OrderController.java,e\6\e69564226f562755acddd83c769bac074da24d2c
r
Bsrc/main/java/com/visa/application/config/KafkaConsumerConfig.java,e\4\e42aed864c9d081962e94e9429be16769724d18d
t
Dsrc/main/java/com/visa/application/service/OrderConsumerService.java,0\6\06f0fb69e91138311e9ba8c0ea7dff81d4ae83b9
m
=src/main/java/com/visa/application/KafkaProducerConsumer.java,4\2\425086460f19c2fbe55b0895a151e8d08ce195e3
l
<src/main/java/com/visa/application/NonRepeatedCharacter.java,a\f\afc5553deb5701ec3d8402c163b9aefae268965a
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-mongo-project</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version> <!-- Specify the Spring Boot version here -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- Spring Boot WebFlux for reactive web applications -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Spring Data MongoDB Reactive -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<!-- Spring Kafka for Kafka consumption -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- Jackson Databind for JSON serialization/deserialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Logging dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.visa.application;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaProducerConsumer {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumer.class);
}
}
\ No newline at end of file
package com.visa.application.config;
import com.visa.application.entity.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Order.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
package com.visa.application.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // Use JSON for message serialization
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.visa.application.controller;
import com.visa.application.service.OrderProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private static final Logger logger = LoggerFactory.getLogger(OrderController.class);
@Autowired
private OrderProducerService orderProducerService;
@GetMapping("/publish")
public Mono<ResponseEntity<String>> publishOrdersToKafka() {
return orderProducerService.publishOrdersToKafka()
.then(Mono.just(ResponseEntity.ok("Orders are being published to Kafka")))
.onErrorResume(e -> {
logger.error("Error publishing orders to Kafka", e);
return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Failed to publish orders to Kafka"));
});
}
}
package com.visa.application.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection = "order")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
@Id
private String id;
private String orderName;
private String price;
}
package com.visa.application.repository;
import com.visa.application.entity.Order;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Mono;
public interface OrderRepository extends ReactiveMongoRepository<Order, String> {
Mono<Order> findById(String orderId);
}
package com.visa.application.service;
import com.visa.application.entity.Order;
import com.visa.application.repository.OrderRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@EnableKafka
public class OrderConsumerService {
@Autowired
private OrderRepository orderRepository;
// Kafka listener to consume messages from the 'order-topic' topic
@KafkaListener(topics = "order-topic", groupId = "my-consumer-group"/*, containerFactory = "kafkaListenerContainerFactory"*/)
public void consumeOrder(Order order) {
System.out.println("Consumed Order: " + order);
// Save the consumed order to MongoDB
saveOrderToMongo(order).subscribe();
}
private Mono<Order> saveOrderToMongo(Order order) {
return orderRepository.save(order)
.doOnSuccess(savedOrder -> System.out.println("Saved to MongoDB: " + savedOrder))
.doOnError(err -> System.err.println("Error saving to MongoDB: " + err.getMessage()));
}
}
package com.visa.application.service;
import com.visa.application.entity.Order;
import com.visa.application.repository.OrderRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class OrderProducerService {
private static final Logger logger = LoggerFactory.getLogger(OrderProducerService.class);
@Autowired
private OrderRepository orderRepository;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC = "order-topic";
public Mono<Void> publishOrdersToKafka() {
Flux<Order> ordersFlux = orderRepository.findAll();
return ordersFlux
.flatMap(this::sendOrderToKafka)
.then()
.doOnError(e -> logger.error("Error occurred while publishing orders to Kafka", e))
.doOnSuccess(aVoid -> logger.info("All orders successfully published to Kafka"));
}
private Mono<Order> sendOrderToKafka(Order order) {
return Mono.create(sink -> {
kafkaTemplate.send(TOPIC, order.getId(), order)
.addCallback(
result -> {
logger.info("Order with ID {} successfully sent to Kafka", order.getId());
sink.success(order);
},
ex -> {
logger.error("Failed to send order with ID {} to Kafka", order.getId(), ex);
sink.error(ex);
}
);
});
}
}
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
kafka.topic.order=order-topic
kafka.group.order=test-consumer-group
spring.data.mongodb.uri=mongodb://localhost:27017/orders_db
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment