from pyspark.sql import SparkSession from pyspark.sql.types import * from kafka import KafkaProducer from json import dumps from pyspark.sql.functions import * def on_send_success(record_metadata): # Successful result returns assigned partition and offset print("Topic: {0}".format(metadata.topic)) print("Partition: {0}".format(metadata.partition)) print("Offset :{0}".format(metadata.offset)) def on_send_error(excp): log.error("I am an errback: {0}".format(exc_info=excp)) # security_protocol="SASL_PLAINTEXT" if __name__=='__main__': spark=SparkSession.builder.master("local").appName("Kafka Batch Publish").getOrCreate() producer=KafkaProducer(sasl_mechanism ='SCRAM-SHA-256', \ security_protocol="SASL_SSL",\ sasl_plain_username='gdntyg54', \ sasl_plain_password='Tp7JmiP3NPN90MNucuhtRNquipdNpF2k',\ # bootstrap_servers=['moped-01.srvs.cloudkafka.com:9094','moped-02.srvs.cloudkafka.com:9094','moped-03.srvs.cloudkafka.com:9094'], \ bootstrap_servers=["moped-01.srvs.cloudkafka.com:9094","moped-02.srvs.cloudkafka.com:9094","moped-03.srvs.cloudkafka.com:9094"], \ # key_serializer=str.encode,\ value_serializer=lambda x:dumps(x).encode('utf-8')) # ,moped-02.srvs.cloudkafka.com:9094,moped-03.srvs.cloudkafka.com:9094'], \ schema = StructType([StructField("lsoa_code", StringType(), True), \ StructField("borough", StringType(), True), \ StructField("major_category", StringType(), True), \ StructField("minor_category", StringType(), True), \ StructField("value", StringType(), True), \ StructField("year", StringType(), True), \ StructField("month", StringType(), True)]) # crimesDF=spark.read.format("csv").schema(schema).load("C:\Users\jvisvanathan\PycharmProjects\PycharmExample\SparkStreaming\datasets\dropLocation") crimesDF = spark.read.format("csv").schema(schema).load("C:\\Users\jvisvanathan\PycharmProjects\PycharmExample\SparkStreaming\datasets\dropLocation") crimesDF.createOrReplaceTempView("crimesTbl") crimes=spark.sql("select major_category as key, CONCAT(lsoa_code,',',major_category,',',minor_category \ ,',',value,',',year,',',month) as value from crimesTbl") for row in crimes.rdd.take(10): print("Key: {0}, Value: {1}".format(row.key,row.value)) kafka_topic="gdntyg54-test_topic" # if type(kafka_topic) == bytes: # kafka_topic = kafka_topic.decode('utf-8') for row in crimes.take(10): message={} message["key"]=row.key message["value"]=row.value print("Message to be sent: {0}".format(message)) # Asynchronous Message Publish - value_serializer=lambda x:dumps(x).encode('utf-8')) ack=producer.send("gdntyg54-test_topic",message) # Produce Keyed Messages to enable hashed Partitioning # ack=producer.send("gdntyg54-test_topic",key=b'row.key',value=b'row.value') # Produce Json Messages - value_serializer=lambda x:dumps(x).encode('ascii')) # ack=producer.send("gdntyg54-test_topic",message).add_callback(on_send_success).add_errback(on_send_error) metadata=ack.get() # block until all async messages are sent producer.flush() producer.close()