Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
K
kafka-reactive-producerAvro
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Pooja
kafka-reactive-producerAvro
Commits
c72018bb
Commit
c72018bb
authored
Sep 18, 2020
by
Pooja
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
kafka-reactive-producerAvro
parents
Changes
14
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
643 additions
and
0 deletions
+643
-0
.gitignore
kafka-reactive-producerAvro/.gitignore
+33
-0
MavenWrapperDownloader.java
...ive-producerAvro/.mvn/wrapper/MavenWrapperDownloader.java
+117
-0
maven-wrapper.jar
kafka-reactive-producerAvro/.mvn/wrapper/maven-wrapper.jar
+0
-0
maven-wrapper.properties
...active-producerAvro/.mvn/wrapper/maven-wrapper.properties
+2
-0
mvnw
kafka-reactive-producerAvro/mvnw
+0
-0
mvnw.cmd
kafka-reactive-producerAvro/mvnw.cmd
+182
-0
pom.xml
kafka-reactive-producerAvro/pom.xml
+106
-0
IKafkaConstants.java
...oducerAvro/kafkareactiveproducerAvro/IKafkaConstants.java
+8
-0
KafkaProducerConfig.java
...erAvro/kafkareactiveproducerAvro/KafkaProducerConfig.java
+49
-0
KafkaReactiveProducerAvroApplication.java
...iveproducerAvro/KafkaReactiveProducerAvroApplication.java
+27
-0
SampleProducer.java
...roducerAvro/kafkareactiveproducerAvro/SampleProducer.java
+94
-0
application.properties
...ve-producerAvro/src/main/resources/application.properties
+1
-0
PersonData.avsc
...tive-producerAvro/src/main/resources/avro/PersonData.avsc
+11
-0
KafkaReactiveProducerAvroApplicationTests.java
...oducerAvro/KafkaReactiveProducerAvroApplicationTests.java
+13
-0
No files found.
kafka-reactive-producerAvro/.gitignore
0 → 100644
View file @
c72018bb
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
kafka-reactive-producerAvro/.mvn/wrapper/MavenWrapperDownloader.java
0 → 100644
View file @
c72018bb
/*
* Copyright 2007-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import
java.net.*
;
import
java.io.*
;
import
java.nio.channels.*
;
import
java.util.Properties
;
public
class
MavenWrapperDownloader
{
private
static
final
String
WRAPPER_VERSION
=
"0.5.6"
;
/**
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
*/
private
static
final
String
DEFAULT_DOWNLOAD_URL
=
"https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+
WRAPPER_VERSION
+
"/maven-wrapper-"
+
WRAPPER_VERSION
+
".jar"
;
/**
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
* use instead of the default one.
*/
private
static
final
String
MAVEN_WRAPPER_PROPERTIES_PATH
=
".mvn/wrapper/maven-wrapper.properties"
;
/**
* Path where the maven-wrapper.jar will be saved to.
*/
private
static
final
String
MAVEN_WRAPPER_JAR_PATH
=
".mvn/wrapper/maven-wrapper.jar"
;
/**
* Name of the property which should be used to override the default download url for the wrapper.
*/
private
static
final
String
PROPERTY_NAME_WRAPPER_URL
=
"wrapperUrl"
;
public
static
void
main
(
String
args
[])
{
System
.
out
.
println
(
"- Downloader started"
);
File
baseDirectory
=
new
File
(
args
[
0
]);
System
.
out
.
println
(
"- Using base directory: "
+
baseDirectory
.
getAbsolutePath
());
// If the maven-wrapper.properties exists, read it and check if it contains a custom
// wrapperUrl parameter.
File
mavenWrapperPropertyFile
=
new
File
(
baseDirectory
,
MAVEN_WRAPPER_PROPERTIES_PATH
);
String
url
=
DEFAULT_DOWNLOAD_URL
;
if
(
mavenWrapperPropertyFile
.
exists
())
{
FileInputStream
mavenWrapperPropertyFileInputStream
=
null
;
try
{
mavenWrapperPropertyFileInputStream
=
new
FileInputStream
(
mavenWrapperPropertyFile
);
Properties
mavenWrapperProperties
=
new
Properties
();
mavenWrapperProperties
.
load
(
mavenWrapperPropertyFileInputStream
);
url
=
mavenWrapperProperties
.
getProperty
(
PROPERTY_NAME_WRAPPER_URL
,
url
);
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"- ERROR loading '"
+
MAVEN_WRAPPER_PROPERTIES_PATH
+
"'"
);
}
finally
{
try
{
if
(
mavenWrapperPropertyFileInputStream
!=
null
)
{
mavenWrapperPropertyFileInputStream
.
close
();
}
}
catch
(
IOException
e
)
{
// Ignore ...
}
}
}
System
.
out
.
println
(
"- Downloading from: "
+
url
);
File
outputFile
=
new
File
(
baseDirectory
.
getAbsolutePath
(),
MAVEN_WRAPPER_JAR_PATH
);
if
(!
outputFile
.
getParentFile
().
exists
())
{
if
(!
outputFile
.
getParentFile
().
mkdirs
())
{
System
.
out
.
println
(
"- ERROR creating output directory '"
+
outputFile
.
getParentFile
().
getAbsolutePath
()
+
"'"
);
}
}
System
.
out
.
println
(
"- Downloading to: "
+
outputFile
.
getAbsolutePath
());
try
{
downloadFileFromURL
(
url
,
outputFile
);
System
.
out
.
println
(
"Done"
);
System
.
exit
(
0
);
}
catch
(
Throwable
e
)
{
System
.
out
.
println
(
"- Error downloading"
);
e
.
printStackTrace
();
System
.
exit
(
1
);
}
}
private
static
void
downloadFileFromURL
(
String
urlString
,
File
destination
)
throws
Exception
{
if
(
System
.
getenv
(
"MVNW_USERNAME"
)
!=
null
&&
System
.
getenv
(
"MVNW_PASSWORD"
)
!=
null
)
{
String
username
=
System
.
getenv
(
"MVNW_USERNAME"
);
char
[]
password
=
System
.
getenv
(
"MVNW_PASSWORD"
).
toCharArray
();
Authenticator
.
setDefault
(
new
Authenticator
()
{
@Override
protected
PasswordAuthentication
getPasswordAuthentication
()
{
return
new
PasswordAuthentication
(
username
,
password
);
}
});
}
URL
website
=
new
URL
(
urlString
);
ReadableByteChannel
rbc
;
rbc
=
Channels
.
newChannel
(
website
.
openStream
());
FileOutputStream
fos
=
new
FileOutputStream
(
destination
);
fos
.
getChannel
().
transferFrom
(
rbc
,
0
,
Long
.
MAX_VALUE
);
fos
.
close
();
rbc
.
close
();
}
}
kafka-reactive-producerAvro/.mvn/wrapper/maven-wrapper.jar
0 → 100644
View file @
c72018bb
File added
kafka-reactive-producerAvro/.mvn/wrapper/maven-wrapper.properties
0 → 100644
View file @
c72018bb
distributionUrl
=
https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
wrapperUrl
=
https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
kafka-reactive-producerAvro/mvnw
0 → 100644
View file @
c72018bb
This diff is collapsed.
Click to expand it.
kafka-reactive-producerAvro/mvnw.cmd
0 → 100644
View file @
c72018bb
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
)
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
if "%MVNW_VERBOSE%" == "true" (
echo Found %WRAPPER_JAR%
)
) else (
if not "%MVNW_REPOURL%" == "" (
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
)
if "%MVNW_VERBOSE%" == "true" (
echo Couldn't find %WRAPPER_JAR%, downloading it ...
echo Downloading from: %DOWNLOAD_URL%
)
powershell -Command "&{"^
"$webclient = new-object System.Net.WebClient;"^
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
"}"^
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
"}"
if "%MVNW_VERBOSE%" == "true" (
echo Finished downloading %WRAPPER_JAR%
)
)
@REM End of extension
@REM Provide a "standardized" way to retrieve the CLI args that will
@REM work with both Windows and non-Windows executions.
set MAVEN_CMD_LINE_ARGS=%*
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%
kafka-reactive-producerAvro/pom.xml
0 → 100644
View file @
c72018bb
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-parent
</artifactId>
<version>
2.3.3.RELEASE
</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<groupId>
kafka.reactive.producerAvro
</groupId>
<artifactId>
kafka-reactive-producerAvro
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
<name>
kafka-reactive-producerAvro
</name>
<description>
Demo project for Spring Boot
</description>
<properties>
<java.version>
8
</java.version>
</properties>
<dependencies>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.avro
</groupId>
<artifactId>
avro
</artifactId>
<version>
1.9.2
</version>
</dependency>
<dependency>
<groupId>
io.projectreactor.kafka
</groupId>
<artifactId>
reactor-kafka
</artifactId>
<version>
1.2.2.RELEASE
</version>
</dependency>
<dependency>
<groupId>
io.projectreactor
</groupId>
<artifactId>
reactor-core
</artifactId>
<version>
3.3.5.RELEASE
</version>
</dependency>
<dependency>
<groupId>
org.apache.kafka
</groupId>
<artifactId>
kafka-clients
</artifactId>
<version>
2.4.0
</version>
</dependency>
<dependency>
<groupId>
io.confluent
</groupId>
<artifactId>
kafka-avro-serializer
</artifactId>
<version>
5.5.0
</version>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-test
</artifactId>
<scope>
test
</scope>
<exclusions>
<exclusion>
<groupId>
org.junit.vintage
</groupId>
<artifactId>
junit-vintage-engine
</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-maven-plugin
</artifactId>
</plugin>
<plugin>
<groupId>
org.apache.avro
</groupId>
<artifactId>
avro-maven-plugin
</artifactId>
<version>
1.8.2
</version>
<executions>
<execution>
<phase>
generate-sources
</phase>
<goals>
<goal>
schema
</goal>
</goals>
<configuration>
<sourceDirectory>
${project.basedir}/src/main/resources/avro/
</sourceDirectory>
<outputDirectory>
${project.build.directory}/generated/avro
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>
confluent
</id>
<url>
https://packages.confluent.io/maven/
</url>
</repository>
</repositories>
</project>
kafka-reactive-producerAvro/src/main/java/kafka/reactive/producerAvro/kafkareactiveproducerAvro/IKafkaConstants.java
0 → 100644
View file @
c72018bb
package
kafka
.
reactive
.
producerAvro
.
kafkareactiveproducerAvro
;
public
interface
IKafkaConstants
{
String
TOPIC_NAME
=
"Person"
;
String
KAFKA_SERVER
=
"127.0.0.1:9092"
;
String
START_OFFSET
=
"earliest"
;
String
SCHEMA_REGISTRY_URL
=
"http://127.0.0.1:8081"
;
}
kafka-reactive-producerAvro/src/main/java/kafka/reactive/producerAvro/kafkareactiveproducerAvro/KafkaProducerConfig.java
0 → 100644
View file @
c72018bb
package
kafka
.
reactive
.
producerAvro
.
kafkareactiveproducerAvro
;
import
com.genericAvro.PersonData
;
import
io.confluent.kafka.serializers.KafkaAvroSerializer
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.Producer
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Properties
;
@Configuration
public
class
KafkaProducerConfig
{
@Bean
public
Producer
<
String
,
PersonData
>
getProducer
()
{
Properties
properties
=
new
Properties
();
// Kafka Properties
properties
.
setProperty
(
"bootstrap.servers"
,
IKafkaConstants
.
KAFKA_SERVER
);
properties
.
setProperty
(
"acks"
,
"all"
);
properties
.
setProperty
(
"retries"
,
"10"
);
// Avro properties
properties
.
setProperty
(
"key.serializer"
,
StringSerializer
.
class
.
getName
());
properties
.
setProperty
(
"value.serializer"
,
KafkaAvroSerializer
.
class
.
getName
());
properties
.
setProperty
(
"schema.registry.url"
,
IKafkaConstants
.
SCHEMA_REGISTRY_URL
);
Producer
<
String
,
PersonData
>
producer
=
new
KafkaProducer
<
String
,
PersonData
>(
properties
);
return
producer
;
}
/*@Bean
public ProducerFactory<String, PersonData> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_SERVER);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(IKafkaConstants.SCHEMA_REGISTRY_URL, "http://127.0.0.1:8081");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, PersonData> kafkaTemplate() {
return new KafkaTemplate<String, PersonData>(producerFactory());
}*/
}
kafka-reactive-producerAvro/src/main/java/kafka/reactive/producerAvro/kafkareactiveproducerAvro/KafkaReactiveProducerAvroApplication.java
0 → 100644
View file @
c72018bb
package
kafka
.
reactive
.
producerAvro
.
kafkareactiveproducerAvro
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
@SpringBootApplication
public
class
KafkaReactiveProducerAvroApplication
{
private
static
final
String
BOOTSTRAP_SERVERS
=
"localhost:9092"
;
private
static
final
String
TOPIC
=
"topic-avro_1"
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
SampleProducer
producer
=
new
SampleProducer
();
int
count
=
20
;
CountDownLatch
latch
=
new
CountDownLatch
(
count
);
producer
.
sendMessage
(
TOPIC
,
count
,
latch
,
message
);
latch
.
await
(
10
,
TimeUnit
.
SECONDS
);
producer
.
close
();
SpringApplication
.
run
(
KafkaReactiveProducerAvroApplication
.
class
,
args
);
}
}
kafka-reactive-producerAvro/src/main/java/kafka/reactive/producerAvro/kafkareactiveproducerAvro/SampleProducer.java
0 → 100644
View file @
c72018bb
package
kafka
.
reactive
.
producerAvro
.
kafkareactiveproducerAvro
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.CountDownLatch
;
import
com.genericAvro.PersonData
;
import
org.apache.kafka.clients.producer.*
;
import
org.apache.kafka.common.serialization.IntegerSerializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.PostMapping
;
import
org.springframework.web.bind.annotation.RequestBody
;
import
org.springframework.web.bind.annotation.RestController
;
import
reactor.core.publisher.Flux
;
import
reactor.kafka.sender.KafkaSender
;
import
reactor.kafka.sender.SenderOptions
;
import
reactor.kafka.sender.SenderRecord
;
@RestController
public
class
SampleProducer
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
SampleProducer
.
class
.
getName
());
private
static
final
String
BOOTSTRAP_SERVERS
=
"localhost:9092"
;
private
static
final
String
TOPIC
=
"topic-avro_1"
;
@Autowired
KafkaProducerConfig
kafkaProducerConfig
;
PersonData
personData
;
private
final
KafkaSender
<
Integer
,
String
>
sender
;
private
final
SimpleDateFormat
dateFormat
;
public
SampleProducer
()
{
Map
<
String
,
Object
>
props
=
new
HashMap
<>();
props
.
put
(
ProducerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
"localhost:9092"
);
props
.
put
(
ProducerConfig
.
CLIENT_ID_CONFIG
,
"sample-producer"
);
props
.
put
(
ProducerConfig
.
ACKS_CONFIG
,
"all"
);
props
.
put
(
ProducerConfig
.
KEY_SERIALIZER_CLASS_CONFIG
,
IntegerSerializer
.
class
);
props
.
put
(
ProducerConfig
.
VALUE_SERIALIZER_CLASS_CONFIG
,
StringSerializer
.
class
);
SenderOptions
<
Integer
,
String
>
senderOptions
=
SenderOptions
.
create
(
props
);
sender
=
KafkaSender
.
create
(
senderOptions
);
dateFormat
=
new
SimpleDateFormat
(
"HH:mm:ss:SSS z dd MMM yyyy"
);
}
//added
@PostMapping
(
value
=
"/postPerson"
,
consumes
=
{
"application/json"
},
produces
=
{
"application/json"
})
public
String
postJsonMessage
(
@RequestBody
PersonData
person
){
System
.
out
.
println
(
"inside"
+
person
.
getAge
()+
""
+
person
.
getName
());
Producer
<
String
,
PersonData
>
producer
=
kafkaProducerConfig
.
getProducer
();
personData
=
PersonData
.
newBuilder
()
.
setAge
(
person
.
getAge
())
.
setName
(
person
.
getName
())
.
setIncome
(
person
.
getIncome
())
.
build
();
System
.
out
.
println
(
"after fetching"
+
personData
.
getAge
()+
""
+
personData
.
getName
());
return
"Message published successfully"
;
}
//ended
public
void
sendMessage
(
String
topic
,
int
count
,
CountDownLatch
latch
)
throws
InterruptedException
{
sender
.<
Integer
>
send
(
Flux
.
range
(
1
,
count
)
.
map
(
i
->
SenderRecord
.
create
(
new
ProducerRecord
<>(
topic
,
i
,
personData
+
""
+
i
),
i
)))
.
doOnError
(
e
->
log
.
error
(
"Send failed"
,
e
))
.
subscribe
(
r
->
{
RecordMetadata
metadata
=
r
.
recordMetadata
();
System
.
out
.
printf
(
"Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n"
,
r
.
correlationMetadata
(),
metadata
.
topic
(),
metadata
.
partition
(),
metadata
.
offset
(),
dateFormat
.
format
(
new
Date
(
metadata
.
timestamp
())));
latch
.
countDown
();
});
}
public
void
close
()
{
sender
.
close
();
}
}
kafka-reactive-producerAvro/src/main/resources/application.properties
0 → 100644
View file @
c72018bb
server.port
=
9094
kafka-reactive-producerAvro/src/main/resources/avro/PersonData.avsc
0 → 100644
View file @
c72018bb
{
"type":"record",
"namespace":"com.genericAvro",
"name":"PersonData",
"doc":"This is Person Information",
"fields":[
{ "name":"name","type":"string"},
{ "name":"age","type":"string"},
{ "name":"income","type":"int"}
]
}
kafka-reactive-producerAvro/src/test/java/kafka/reactive/producerAvro/kafkareactiveproducerAvro/KafkaReactiveProducerAvroApplicationTests.java
0 → 100644
View file @
c72018bb
package
kafka
.
reactive
.
producerAvro
.
kafkareactiveproducerAvro
;
import
org.junit.jupiter.api.Test
;
import
org.springframework.boot.test.context.SpringBootTest
;
@SpringBootTest
class
KafkaReactiveProducerAvroApplicationTests
{
@Test
void
contextLoads
()
{
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment