Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
spring-boot-kafka-producer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Uday Singh
spring-boot-kafka-producer
Commits
1377eef3
Commit
1377eef3
authored
May 29, 2020
by
Uday Singh
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'PE-105_component_test' into 'master'
Pe 105 component test See merge request
!2
parents
7f355b96
5379ff40
Changes
10
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
124 additions
and
171 deletions
+124
-171
build.gradle
build.gradle
+2
-3
ProducerConfiguration.java
...sdataproducer/appconfiguration/ProducerConfiguration.java
+16
-24
BoToDtoMapper.java
.../safeway/epe/offersdataproducer/mapper/BoToDtoMapper.java
+0
-8
OffersDataProducer.java
...y/epe/offersdataproducer/producer/OffersDataProducer.java
+16
-9
OffersDataProducerResource.java
...fersdataproducer/resource/OffersDataProducerResource.java
+5
-4
OffersDataProducerService.java
...offersdataproducer/service/OffersDataProducerService.java
+3
-3
OffersDataProducerServiceImpl.java
...aproducer/service/impl/OffersDataProducerServiceImpl.java
+11
-5
OffersDataDTO.json
...afeway/epe/offersdataproducer/resource/OffersDataDTO.json
+1
-0
OffersDataProducerResourceComponentTest.java
...cer/resource/OffersDataProducerResourceComponentTest.java
+70
-29
OffersDataProducerResourceTest.java
...dataproducer/resource/OffersDataProducerResourceTest.java
+0
-86
No files found.
build.gradle
View file @
1377eef3
...
...
@@ -17,20 +17,19 @@ ext {
}
dependencies
{
implementation
'org.springframework.boot:spring-boot-starter-web'
implementation
'org.springframework.kafka:spring-kafka'
compile
group:
'org.json'
,
name:
'json'
,
version:
'20180813'
testImplementation
(
'org.springframework.boot:spring-boot-starter-test'
)
{
//exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation
'org.springframework.kafka:spring-kafka-test'
compile
group:
'com.fasterxml'
,
name:
'jackson-module-json-org'
,
version:
'0.9.1'
implementation
"org.projectlombok:lombok:${lombokVersion}"
annotationProcessor
"org.projectlombok:lombok:${lombokVersion}"
implementation
"org.mapstruct:mapstruct:${mapstructVersion}"
annotationProcessor
"org.mapstruct:mapstruct-processor:${mapstructVersion}"
compile
group:
'com.google.guava'
,
name:
'guava'
,
version:
'28.2-jre'
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
compile
group:
'org.apache.kafka'
,
name:
'kafka-clients'
,
version:
'2.5.0'
testImplementation
'org.springframework.kafka:spring-kafka-test'
}
...
...
src/main/java/com/safeway/epe/offersdataproducer/appconfiguration/ProducerConfiguration.java
View file @
1377eef3
...
...
@@ -4,32 +4,24 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.core.DefaultKafkaProducerFactory
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.core.ProducerFactory
;
import
org.springframework.stereotype.Component
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
@Co
nfiguration
@Co
mponent
public
class
ProducerConfiguration
{
public
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ProducerConfiguration
.
class
);
@Bean
public
ProducerFactory
<
String
,
String
>
producerFactory
()
{
logger
.
info
(
"Setting producer configuration "
);
Map
<
String
,
Object
>
config
=
new
HashMap
<>();
config
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
AppConfigs
.
BOOTSTRAP_SERVER
);
config
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
config
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
return
new
DefaultKafkaProducerFactory
<>(
config
);
}
@Bean
@Qualifier
(
"kafkaTemplate"
)
public
KafkaTemplate
<
String
,
String
>
kafkaTemplate
()
{
return
new
KafkaTemplate
<>(
producerFactory
());
public
Properties
configuration
(){
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
AppConfigs
.
BOOTSTRAP_SERVER
);
properties
.
setProperty
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
.
getName
());
return
properties
;
}
}
}
src/main/java/com/safeway/epe/offersdataproducer/mapper/BoToDtoMapper.java
View file @
1377eef3
...
...
@@ -11,29 +11,21 @@ import org.mapstruct.Mapping;
public
interface
BoToDtoMapper
{
@Mapping
(
target
=
"offersDTO"
,
source
=
"offersBO"
)
OffersDataDTO
map
(
OffersDataBO
offersDataBO
);
@Mapping
(
target
=
"rulesDTO"
,
source
=
"rulesBO"
)
@Mapping
(
target
=
"infoDTO"
,
source
=
"infoBO"
)
OffersDTO
map
(
OffersBO
offersBO
);
@Mapping
(
target
=
"idDTO"
,
source
=
"idBO"
)
@Mapping
(
target
=
"terminalsDTO"
,
source
=
"terminalsBO"
)
InfoDTO
map
(
InfoBO
infoBO
);
IdDTO
map
(
IdBO
idBO
);
@Mapping
(
target
=
"benefitDTO"
,
source
=
"benefitBO"
)
RulesDTO
map
(
RulesBO
rulesBO
);
@Mapping
(
target
=
"pointsDTO"
,
source
=
"pointsBO"
)
@Mapping
(
target
=
"discountDTO"
,
source
=
"discountBO"
)
BenefitDTO
map
(
BenefitBO
BenefitBO
);
@Mapping
(
target
=
"discountTierDTO"
,
source
=
"discountTierBO"
)
DiscountDTO
map
(
DiscountBO
DiscountBO
);
DiscountTierDTO
map
(
DiscountTierBO
discountTierBO
);
PointsDTO
map
(
PointsBO
pointsBO
);
}
src/main/java/com/safeway/epe/offersdataproducer/producer/OffersDataProducer.java
View file @
1377eef3
...
...
@@ -3,26 +3,33 @@ package com.safeway.epe.offersdataproducer.producer;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs
;
import
com.safeway.epe.offersdataproducer.appconfiguration.ProducerConfiguration
;
import
com.safeway.epe.offersdataproducer.dtomodel.OffersDataDTO
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
java.util.concurrent.Future
;
@Component
public
class
OffersDataProducer
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
OffersDataProducer
.
class
);
@Autowired
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
private
ProducerConfiguration
producerConfiguration
;
@Autowired
private
ObjectMapper
objectMapper
;
public
ListenableFuture
<
SendResult
<
String
,
String
>>
sendMessage
(
OffersDataDTO
offersDataDTO
)
throws
JsonProcessingException
{
logger
.
info
(
String
.
format
(
"$$ -> Producing message --> %s"
,
offersDataDTO
));
String
offersDataBOResp
=
objectMapper
.
writeValueAsString
(
offersDataDTO
);
ListenableFuture
<
SendResult
<
String
,
String
>>
message
=
kafkaTemplate
.
send
(
AppConfigs
.
TOPIC_NAME
,
offersDataBOResp
);
return
message
;
public
Future
sendEvent
(
OffersDataDTO
offersDataDTO
)
throws
JsonProcessingException
{
logger
.
info
(
"Start :: OffersDataProducer.sendEvent"
);
String
offersDataDTOResp
=
objectMapper
.
writeValueAsString
(
offersDataDTO
);
KafkaProducer
producer
=
new
KafkaProducer
(
producerConfiguration
.
configuration
());
ProducerRecord
record
=
new
ProducerRecord
(
AppConfigs
.
TOPIC_NAME
,
offersDataDTOResp
);
Future
<
RecordMetadata
>
future
=
producer
.
send
(
record
);
logger
.
info
(
"End :: OffersDataProducer.sendEvent"
);
return
future
;
}
}
src/main/java/com/safeway/epe/offersdataproducer/resource/OffersDataProducerResource.java
View file @
1377eef3
...
...
@@ -4,13 +4,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import
com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs
;
import
com.safeway.epe.offersdataproducer.domodel.OffersDataDO
;
import
com.safeway.epe.offersdataproducer.service.OffersDataProducerService
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.concurrent.Future
;
@RestController
@RequestMapping
(
AppConfigs
.
CLASS_LEVEL_PRODUCER_URL
)
public
class
OffersDataProducerResource
{
...
...
@@ -18,8 +19,8 @@ public class OffersDataProducerResource {
private
OffersDataProducerService
offersDataProducerService
;
@PostMapping
(
AppConfigs
.
CLASS_LEVEL_PRODUCER_URL
)
@ResponseStatus
(
HttpStatus
.
CREATED
)
public
ResponseEntity
<
ListenableFuture
<
SendResult
<
String
,
String
>
>>
publishOfferEvent
(
@RequestBody
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
{
ListenableFuture
<
SendResult
<
String
,
String
>
>
offerEventStatus
=
offersDataProducerService
.
publishOfferEvent
(
offersDataDO
);
public
ResponseEntity
<
Future
<
RecordMetadata
>>
publishOfferEvent
(
@RequestBody
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
{
Future
<
RecordMetadata
>
offerEventStatus
=
offersDataProducerService
.
publishOfferEvent
(
offersDataDO
);
return
ResponseEntity
.
status
(
HttpStatus
.
CREATED
).
body
(
offerEventStatus
);
}
}
src/main/java/com/safeway/epe/offersdataproducer/service/OffersDataProducerService.java
View file @
1377eef3
...
...
@@ -2,9 +2,9 @@ package com.safeway.epe.offersdataproducer.service;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.safeway.epe.offersdataproducer.domodel.OffersDataDO
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.util.concurrent.Listenable
Future
;
import
java.util.concurrent.
Future
;
public
interface
OffersDataProducerService
{
ListenableFuture
<
SendResult
<
String
,
String
>>
publishOfferEvent
(
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
;
public
Future
publishOfferEvent
(
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
;
}
src/main/java/com/safeway/epe/offersdataproducer/service/impl/OffersDataProducerServiceImpl.java
View file @
1377eef3
...
...
@@ -8,13 +8,17 @@ import com.safeway.epe.offersdataproducer.producer.OffersDataProducer;
import
com.safeway.epe.offersdataproducer.service.OffersDataProducerService
;
import
com.safeway.epe.offersdataproducer.transformer.OffersDataBoToDTO
;
import
com.safeway.epe.offersdataproducer.transformer.OffersDataDoToBo
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
java.util.concurrent.Future
;
@Service
public
class
OffersDataProducerServiceImpl
implements
OffersDataProducerService
{
public
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
OffersDataProducerServiceImpl
.
class
);
@Autowired
private
OffersDataBoToDTO
offersDataBoToDTO
;
@Autowired
...
...
@@ -22,10 +26,12 @@ public class OffersDataProducerServiceImpl implements OffersDataProducerService
@Autowired
private
OffersDataProducer
offersDataProducer
;
@Override
public
ListenableFuture
<
SendResult
<
String
,
String
>>
publishOfferEvent
(
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
{
public
Future
publishOfferEvent
(
OffersDataDO
offersDataDO
)
throws
JsonProcessingException
{
logger
.
info
(
"Start :: OffersDataProducerServiceImpl.publishOfferEvent"
);
OffersDataBO
offersDataBO
=
offersDataDoToBo
.
transerToBo
(
offersDataDO
);
OffersDataDTO
offersDataDTO
=
offersDataBoToDTO
.
transferToDTO
(
offersDataBO
);
ListenableFuture
<
SendResult
<
String
,
String
>>
eventStatus
=
offersDataProducer
.
sendMessage
(
offersDataDTO
);
return
eventStatus
;
Future
<
RecordMetadata
>
future
=
offersDataProducer
.
sendEvent
(
offersDataDTO
);
logger
.
info
(
"End :: OffersDataProducerServiceImpl.publishOfferEvent"
);
return
future
;
}
}
src/test/java/com/safeway/epe/offersdataproducer/resource/OffersDataDTO.json
0 → 100644
View file @
1377eef3
{
"offerId"
:
135918555
,
"offersDTO"
:
[
{
"infoDTO"
:
{
"offerId"
:
135918555
,
"idDTO"
:
{
"offerId"
:
135918555
,
"manufacturerId"
:
"MMM"
},
"offerProgramCode"
:
"SC"
,
"terminalsDTO"
:
[
"Groceryworks (Safeway.com)"
,
"Smart Cart (QVS Virtual Terminals)"
,
"Bakery"
]
},
"rulesDTO"
:
{
"offerId"
:
135918555
,
"startDate"
:
"2019-07-27T07:00:00.000+00:00"
,
"endDate"
:
"2021-02-27T06:59:58.000+00:00"
,
"benefitDTO"
:
{
"benefitValueType"
:
"A"
,
"discountDTO"
:
[
{
"allowNegative"
:
false
,
"flexNegative"
:
false
,
"discountTierDTO"
:
[
{
"amount"
:
6.99
,
"upTo"
:
"3"
,
"itemLimit"
:
5
,
"weightLimit"
:
2
}
]
}
],
"pointsDTO"
:
[
{
"generalPoints"
:
5
,
"loyaltyPoints"
:
250
}
]
}
}
}
]
}
\ No newline at end of file
src/test/java/com/safeway/epe/offersdataproducer/resource/OffersDataProducerResourceComponentTest.java
View file @
1377eef3
This diff is collapsed.
Click to expand it.
src/test/java/com/safeway/epe/offersdataproducer/resource/OffersDataProducerResourceTest.java
deleted
100644 → 0
View file @
7f355b96
package
com
.
safeway
.
epe
.
offersdataproducer
.
resource
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.google.common.collect.Lists
;
import
com.safeway.epe.offersdataproducer.appconfiguration.AppConfigs
;
import
com.safeway.epe.offersdataproducer.domodel.*
;
import
com.safeway.epe.offersdataproducer.service.OffersDataProducerService
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.apache.kafka.common.TopicPartition
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.InjectMocks
;
import
org.mockito.Mock
;
import
org.mockito.Mockito
;
import
org.skyscreamer.jsonassert.JSONAssert
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.test.context.SpringBootTest
;
import
org.springframework.boot.test.web.client.TestRestTemplate
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.test.context.ActiveProfiles
;
import
org.springframework.test.context.junit4.SpringRunner
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
org.springframework.util.concurrent.SettableListenableFuture
;
import
java.util.concurrent.ExecutionException
;
@RunWith
(
SpringRunner
.
class
)
@SpringBootTest
(
webEnvironment
=
SpringBootTest
.
WebEnvironment
.
RANDOM_PORT
)
// for restTemplate
@ActiveProfiles
(
"test"
)
public
class
OffersDataProducerResourceTest
{
@InjectMocks
private
OffersDataProducerResource
offersDataProducerResource
;
@Mock
private
OffersDataProducerService
offersDataProducerService
;
@Autowired
private
TestRestTemplate
testRestTemplate
;
@Test
public
void
publishOfferEventTest
()
throws
JsonProcessingException
,
ExecutionException
,
InterruptedException
{
OffersDataDO
offersDataDO
=
buildOffersDataDO
();
ObjectMapper
mapper
=
new
ObjectMapper
();
String
excpected
=
mapper
.
writeValueAsString
(
offersDataDO
);
SettableListenableFuture
future
=
new
SettableListenableFuture
();
ProducerRecord
<
Integer
,
String
>
producerRecord
=
new
ProducerRecord
(
AppConfigs
.
TOPIC_NAME
,
null
,
offersDataDO
);
RecordMetadata
recordMetadata
=
new
RecordMetadata
(
new
TopicPartition
(
AppConfigs
.
TOPIC_NAME
,
0
),
8
,
1
,
342
,
System
.
currentTimeMillis
(),
-
1
,
581
);
SendResult
<
Integer
,
String
>
sendResult
=
new
SendResult
<
Integer
,
String
>(
producerRecord
,
recordMetadata
);
future
.
set
(
sendResult
);
Mockito
.
when
(
offersDataProducerService
.
publishOfferEvent
(
Mockito
.
any
(
OffersDataDO
.
class
)))
.
thenReturn
(
future
);
ResponseEntity
<
ListenableFuture
<
SendResult
<
String
,
String
>>>
result
=
offersDataProducerResource
.
publishOfferEvent
(
offersDataDO
);
SendResult
<
String
,
String
>
sendResult1
=
result
.
getBody
().
get
();
assert
sendResult1
.
getRecordMetadata
().
partition
()
==
0
;
String
actual
=
mapper
.
writeValueAsString
(
result
.
getBody
().
get
().
getProducerRecord
().
value
());
JSONAssert
.
assertEquals
(
excpected
,
actual
,
false
);
}
private
OffersDataDO
buildOffersDataDO
()
{
DiscountTierDO
discountTierDO
=
DiscountTierDO
.
builder
().
amount
(
6.99f
).
upTo
(
"3"
)
.
itemLimit
(
5
).
weightLimit
(
2
).
build
();
PointsDO
pointsDO
=
PointsDO
.
builder
().
generalPoints
(
5
).
loyaltyPoints
(
250
).
build
();
DiscountDO
discountDO
=
DiscountDO
.
builder
().
discountTierDO
(
Lists
.
newArrayList
(
discountTierDO
))
.
allowNegative
(
false
).
flexNegative
(
false
).
build
();
BenefitDO
benefitDO
=
BenefitDO
.
builder
().
discountDO
(
Lists
.
newArrayList
(
discountDO
))
.
pointsDO
(
Lists
.
newArrayList
(
pointsDO
)).
benefitValueType
(
"A"
)
.
build
();
IdDO
idDO
=
IdDO
.
builder
().
manufacturerId
(
"MMM"
).
offerId
(
135918444
).
build
();
InfoDO
infoDO
=
InfoDO
.
builder
().
offerId
(
135918444
).
idDO
(
idDO
).
offerProgramCode
(
"SC"
)
.
terminalsDO
(
Lists
.
newArrayList
(
"Groceryworks (Safeway.com)"
,
"Smart Cart (QVS Virtual Terminals)"
,
"Bakery"
)).
build
();
RulesDO
rulesDO
=
RulesDO
.
builder
().
offerId
(
135918444
).
benefitDO
(
benefitDO
).
endDate
(
"2021-02-27T06:59:58.000+00:00"
)
.
startDate
(
"2019-07-27T07:00:00.000+00:00"
).
build
();
OffersDO
offersDO
=
OffersDO
.
builder
().
infoDO
(
infoDO
).
rulesDO
(
rulesDO
).
build
();
OffersDataDO
offersDataDO
=
OffersDataDO
.
builder
().
offerId
(
135918444
).
offersDO
(
Lists
.
newArrayList
(
offersDO
)).
build
();
return
offersDataDO
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment