Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
K
kafka-producer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Abhishek Mondal
kafka-producer
Commits
cc3df273
Commit
cc3df273
authored
Feb 21, 2020
by
Abhishek Mondal
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added asyn features
parent
a39570c3
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
34 additions
and
3 deletions
+34
-3
KafkaRestController.java
...er/demo/kafkaproducer/controller/KafkaRestController.java
+2
-0
KafkaMessagePublisher.java
...cer/demo/kafkaproducer/service/KafkaMessagePublisher.java
+32
-3
No files found.
src/main/java/com/kafka/producer/demo/kafkaproducer/controller/KafkaRestController.java
View file @
cc3df273
...
@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
...
@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.util.StringUtils
;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
@Slf4j
@Slf4j
...
@@ -28,6 +29,7 @@ public class KafkaRestController {
...
@@ -28,6 +29,7 @@ public class KafkaRestController {
@PostMapping
(
value
=
"/sendEmployeeData"
,
consumes
=
MediaType
.
APPLICATION_JSON_VALUE
)
@PostMapping
(
value
=
"/sendEmployeeData"
,
consumes
=
MediaType
.
APPLICATION_JSON_VALUE
)
public
ResponseEntity
<
String
>
sendEmployeeData
(
@RequestBody
Employee
employee
)
{
public
ResponseEntity
<
String
>
sendEmployeeData
(
@RequestBody
Employee
employee
)
{
//log.info("Employee :: " +employee);
//log.info("Employee :: " +employee);
if
(!
StringUtils
.
isEmpty
(
employee
))
publisher
.
sendEmployeeData
(
employee
);
publisher
.
sendEmployeeData
(
employee
);
return
new
ResponseEntity
(
"Employee Data sent successfully."
,
HttpStatus
.
OK
);
return
new
ResponseEntity
(
"Employee Data sent successfully."
,
HttpStatus
.
OK
);
}
}
...
...
src/main/java/com/kafka/producer/demo/kafkaproducer/service/KafkaMessagePublisher.java
View file @
cc3df273
package
com
.
kafka
.
producer
.
demo
.
kafkaproducer
.
service
;
package
com
.
kafka
.
producer
.
demo
.
kafkaproducer
.
service
;
import
com.kafka.producer.demo.kafkaproducer.model.Employee
;
import
com.kafka.producer.demo.kafkaproducer.model.Employee
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.beans.factory.annotation.Required
;
import
org.springframework.beans.factory.annotation.Required
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
org.springframework.util.concurrent.ListenableFutureCallback
;
import
java.util.UUID
;
@Service
@Service
public
class
KafkaMessagePublisher
{
public
class
KafkaMessagePublisher
{
private
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
KafkaMessagePublisher
.
class
);
@Autowired
@Autowired
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
...
@@ -17,14 +26,34 @@ public class KafkaMessagePublisher {
...
@@ -17,14 +26,34 @@ public class KafkaMessagePublisher {
@Qualifier
(
"CustomKafkaTemplate"
)
@Qualifier
(
"CustomKafkaTemplate"
)
private
KafkaTemplate
<
String
,
Employee
>
customKafkaTemplate
;
private
KafkaTemplate
<
String
,
Employee
>
customKafkaTemplate
;
String
kafkaTopic
=
"testTopic"
;
String
topicName
=
"testTopic"
;
public
void
sendMessage
(
String
message
)
{
public
void
sendMessage
(
String
message
)
{
kafkaTemplate
.
send
(
kafkaTopic
,
message
);
//Fire and Forgot
//kafkaTemplate.send(topicName, message);
//Fire & Expect Result
ListenableFuture
<
SendResult
<
String
,
String
>>
future
=
kafkaTemplate
.
send
(
topicName
,
UUID
.
randomUUID
().
toString
(),
message
);
future
.
addCallback
(
new
ListenableFutureCallback
<
SendResult
<
String
,
String
>>()
{
@Override
public
void
onSuccess
(
SendResult
<
String
,
String
>
result
)
{
LOGGER
.
info
(
"Sent message=["
+
message
+
"] with offset=["
+
result
.
getRecordMetadata
().
offset
()
+
"]"
);
}
@Override
public
void
onFailure
(
Throwable
ex
)
{
LOGGER
.
info
(
"Unable to send message=["
+
message
+
"] due to : "
+
ex
.
getMessage
());
}
});
}
}
public
void
sendEmployeeData
(
Employee
emp
)
{
public
void
sendEmployeeData
(
Employee
emp
)
{
customKafkaTemplate
.
send
(
kafkaTopic
,
emp
);
customKafkaTemplate
.
send
(
topicName
,
emp
);
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment