Commit 502d14a4 authored by Kenil Mavani's avatar Kenil Mavani

updating logic 3rd time

parent c9165515
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager"> <component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
<option value="$PROJECT_DIR$/../springMono/pom.xml" />
<option value="$USER_HOME$/Downloads/loyalty-events-producer-master/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>
\ No newline at end of file
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
<project version="4"> <project version="4">
<component name="ProjectModuleManager"> <component name="ProjectModuleManager">
<modules> <modules>
<module fileurl="file://$USER_HOME$/Downloads/loyalty-events-producer-master/loyalty-events-producer.iml" filepath="$USER_HOME$/Downloads/loyalty-events-producer-master/loyalty-events-producer.iml" />
<module fileurl="file://$PROJECT_DIR$/../springMono/mono.iml" filepath="$PROJECT_DIR$/../springMono/mono.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/spring-webflux-kafka.iml" filepath="$PROJECT_DIR$/.idea/spring-webflux-kafka.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/spring-webflux-kafka.iml" filepath="$PROJECT_DIR$/.idea/spring-webflux-kafka.iml" />
</modules> </modules>
</component> </component>
......
This diff is collapsed.
package com.example.kafka;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class MessageStore {
private final List<String> allMessages = new ArrayList<>();
public void addMessage(String message){
allMessages.add(message);
}
public String getAllMessages(){
return allMessages.toString();
}
}
package com.example.kafka.consumer; package com.example.kafka.consumer;
import com.example.kafka.service.OrderServiceImpl; import com.example.kafka.service.OrderServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class MessageConsumer { public class MessageConsumer {
private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class.getName());
@Autowired @Autowired
private OrderServiceImpl orderService; private OrderServiceImpl orderService;
@KafkaListener(topics = "${app.topic.name}",groupId = "group1") @KafkaListener(topics = "${app.topic.name}",groupId = "group1")
public void readMessage(String order){ public void readMessage(String order){
orderService.validateOrder(order); orderService.validateOrder(order).doOnNext(log.info("order has been validated"));
} }
} }
...@@ -33,5 +33,12 @@ public class UserController { ...@@ -33,5 +33,12 @@ public class UserController {
.defaultIfEmpty(ResponseEntity.notFound().build()); .defaultIfEmpty(ResponseEntity.notFound().build());
} }
@PutMapping("/{userId}")
public Mono<ResponseEntity<User>> updateUserById(@PathVariable String userId, @RequestBody User user){
return userService.updateUser(userId,user)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.badRequest().build());
}
} }
...@@ -9,5 +9,5 @@ public interface OrderService { ...@@ -9,5 +9,5 @@ public interface OrderService {
Mono<Order> findById(String userId); Mono<Order> findById(String userId);
void validateOrder(String order); Mono<Order> validateOrder(String order);
} }
package com.example.kafka.serviceImpl;
import com.example.kafka.entity.Order;
import com.example.kafka.producer.MessageProducer;
import com.example.kafka.repository.OrderRepository;
import com.example.kafka.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageProducer messageProducer;
@Override
public Mono<Order> saveOrderIntoDB(Order order) {
messageProducer.publishOrder(order.toString());
return orderRepository.save(order);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment