from pyspark.sql import SparkSession from pyspark import SparkContext if __name__=='__main__': spark=SparkSession.builder.master("local").appName("Kafka Consumer").getOrCreate() spark.sparkContext.setLogLevel("INFO") kafka = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers","SASL_SSL://moped-01.srvs.cloudkafka.com:9094,SASL_SSL://moped-02.srvs.cloudkafka.com:9094,SASL_SSL://moped-03.srvs.cloudkafka.com:9094") \ .option("subscribe", "gdntyg54-test_topic") \ .option("startingOffsets","latest") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism","SCRAM-SHA-256") \ .option("kafka.sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='gdntyg54' password='Tp7JmiP3NPN90MNucuhtRNquipdNpF2k';").load() df = kafka.selectExpr("CAST(value AS STRING)") query = df.writeStream.format("console").option("truncate","false").start().awaitTermination()