Commit fda8ff3b authored by Ramakanth Dhane's avatar Ramakanth Dhane

Modified the vehicle related files

parent 0cba1b22
package com.traffic.dao;
import java.util.Date;
import org.springframework.data.cassandra.repository.CassandraRepository;
import org.springframework.data.cassandra.repository.Query;
......
......@@ -10,10 +10,14 @@ import com.traffic.dao.entity.WindowTrafficData;
/**
* Response object containing traffic details that will be sent to dashboard.
*
* @author abaghel
*
*
*/
public class Response implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private List<TotalTrafficData> totalTraffic;
private List<WindowTrafficData> windowTraffic;
private List<POITrafficData> poiTraffic;
......
......@@ -17,7 +17,6 @@ import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.springframework.stereotype.Service;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.google.common.base.Optional;
......@@ -27,7 +26,7 @@ import com.traffic.processor.entity.WindowTrafficData;
import com.traffic.processor.util.GeoDistanceCalculator;
import com.traffic.processor.vo.AggregateKey;
import com.traffic.processor.vo.POIData;
import com.traffic.processor.vo.VehiclesData;
import com.traffic.processor.vo.Vehicle;
import scala.Tuple2;
import scala.Tuple3;
......@@ -41,7 +40,7 @@ public class SparkToCassandraDataProcessor {
*
* @param filteredIotDataStream IoT data stream
*/
public void processTotalTrafficData(JavaDStream<VehiclesData> filteredIotDataStream) {
public void processTotalTrafficData(JavaDStream<Vehicle> filteredIotDataStream) {
// We need to get count of vehicle group by routeId and vehicleType
JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
......@@ -75,7 +74,7 @@ public class SparkToCassandraDataProcessor {
*
* @param filteredIotDataStream IoT data stream
*/
public void processWindowTrafficData(JavaDStream<VehiclesData> filteredIotDataStream) {
public void processWindowTrafficData(JavaDStream<Vehicle> filteredIotDataStream) {
// reduce by key and window (30 sec window and 10 sec slide).
JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
......@@ -104,10 +103,10 @@ public class SparkToCassandraDataProcessor {
* @param nonFilteredIotDataStream original IoT data stream
* @param broadcastPOIValues variable containing POI coordinates, route and vehicle types to monitor.
*/
public void processPOIData(JavaDStream<VehiclesData> nonFilteredIotDataStream,Broadcast<Tuple3<POIData, String, String>> broadcastPOIValues) {
public void processPOIData(JavaDStream<Vehicle> nonFilteredIotDataStream,Broadcast<Tuple3<POIData, String, String>> broadcastPOIValues) {
// Filter by routeId,vehicleType and in POI range
JavaDStream<VehiclesData> iotDataStreamFiltered = nonFilteredIotDataStream
JavaDStream<Vehicle> iotDataStreamFiltered = nonFilteredIotDataStream
.filter(iot -> (iot.getRouteId().equals(broadcastPOIValues.value()._2())
&& iot.getVehicleType().contains(broadcastPOIValues.value()._3())
&& GeoDistanceCalculator.isInPOIRadius(Double.valueOf(iot.getLatitude()),
......@@ -116,7 +115,7 @@ public class SparkToCassandraDataProcessor {
broadcastPOIValues.value()._1().getRadius())));
// pair with poi
JavaPairDStream<VehiclesData, POIData> poiDStreamPair = iotDataStreamFiltered
JavaPairDStream<Vehicle, POIData> poiDStreamPair = iotDataStreamFiltered
.mapToPair(iot -> new Tuple2<>(iot, broadcastPOIValues.value()._1()));
// Transform to dstream of POITrafficData
......@@ -169,7 +168,7 @@ public class SparkToCassandraDataProcessor {
});
//Function to create POITrafficData object from IoT data
private static final Function<Tuple2<VehiclesData, POIData>, POITrafficData> poiTrafficDataFunc = (tuple -> {
private static final Function<Tuple2<Vehicle, POIData>, POITrafficData> poiTrafficDataFunc = (tuple -> {
POITrafficData poiTraffic = new POITrafficData();
poiTraffic.setVehicleId(tuple._1.getVehicleId());
poiTraffic.setVehicleType(tuple._1.getVehicleType());
......
......@@ -27,7 +27,7 @@ import com.google.common.base.Optional;
import com.traffic.processor.util.IoTDataDecoder;
import com.traffic.processor.util.PropertyFileReader;
import com.traffic.processor.vo.POIData;
import com.traffic.processor.vo.VehiclesData;
import com.traffic.processor.vo.Vehicle;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
......@@ -61,10 +61,10 @@ public class TrafficDataProcessor {
Set<String> topicsSet = new HashSet<String>();
topicsSet.add(topic);
//create direct kafka stream
JavaPairInputDStream<String, VehiclesData> directKafkaStream = KafkaUtils.createDirectStream(
JavaPairInputDStream<String, Vehicle> directKafkaStream = KafkaUtils.createDirectStream(
jssc,
String.class,
VehiclesData.class,
Vehicle.class,
StringDecoder.class,
IoTDataDecoder.class,
kafkaParams,
......@@ -73,21 +73,21 @@ public class TrafficDataProcessor {
logger.info("Starting Stream Processing");
//We need non filtered stream for poi traffic data calculation
JavaDStream<VehiclesData> nonFilteredIotDataStream = directKafkaStream.map(tuple -> tuple._2());
JavaDStream<Vehicle> nonFilteredIotDataStream = directKafkaStream.map(tuple -> tuple._2());
//We need filtered stream for total and traffic data calculation
JavaPairDStream<String,VehiclesData> iotDataPairStream = nonFilteredIotDataStream.mapToPair(iot -> new Tuple2<String,VehiclesData>(iot.getVehicleId(),iot)).reduceByKey((a, b) -> a );
JavaPairDStream<String,Vehicle> iotDataPairStream = nonFilteredIotDataStream.mapToPair(iot -> new Tuple2<String,Vehicle>(iot.getVehicleId(),iot)).reduceByKey((a, b) -> a );
// Check vehicle Id is already processed
JavaMapWithStateDStream<String, VehiclesData, Boolean, Tuple2<VehiclesData,Boolean>> iotDStreamWithStatePairs = iotDataPairStream
JavaMapWithStateDStream<String, Vehicle, Boolean, Tuple2<Vehicle,Boolean>> iotDStreamWithStatePairs = iotDataPairStream
.mapWithState(StateSpec.function(processedVehicleFunc).timeout(Durations.seconds(3600)));//maintain state for one hour
// Filter processed vehicle ids and keep un-processed
JavaDStream<Tuple2<VehiclesData,Boolean>> filteredIotDStreams = iotDStreamWithStatePairs.map(tuple2 -> tuple2)
JavaDStream<Tuple2<Vehicle,Boolean>> filteredIotDStreams = iotDStreamWithStatePairs.map(tuple2 -> tuple2)
.filter(tuple -> tuple._2.equals(Boolean.FALSE));
// Get stream of IoTdata
JavaDStream<VehiclesData> filteredIotDataStream = filteredIotDStreams.map(tuple -> tuple._1);
JavaDStream<Vehicle> filteredIotDataStream = filteredIotDStreams.map(tuple -> tuple._1);
//cache stream as it is used in total and window based computation
filteredIotDataStream.cache();
......@@ -113,8 +113,8 @@ public class TrafficDataProcessor {
jssc.awaitTermination();
}
//Funtion to check processed vehicles.
private static final Function3<String, Optional<VehiclesData>, State<Boolean>, Tuple2<VehiclesData,Boolean>> processedVehicleFunc = (String, iot, state) -> {
Tuple2<VehiclesData,Boolean> vehicle = new Tuple2<>(iot.get(),false);
private static final Function3<String, Optional<Vehicle>, State<Boolean>, Tuple2<Vehicle,Boolean>> processedVehicleFunc = (String, iot, state) -> {
Tuple2<Vehicle,Boolean> vehicle = new Tuple2<>(iot.get(),false);
if(state.exists()){
vehicle = new Tuple2<>(iot.get(),true);
}else{
......
package com.traffic.processor.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.traffic.processor.vo.VehiclesData;
import com.traffic.processor.vo.Vehicle;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
......@@ -12,16 +12,16 @@ import kafka.utils.VerifiableProperties;
*
*
*/
public class IoTDataDecoder implements Decoder<VehiclesData> {
public class IoTDataDecoder implements Decoder<Vehicle> {
private static ObjectMapper objectMapper = new ObjectMapper();
public IoTDataDecoder(VerifiableProperties verifiableProperties) {
}
public VehiclesData fromBytes(byte[] bytes) {
public Vehicle fromBytes(byte[] bytes) {
try {
return objectMapper.readValue(bytes, VehiclesData.class);
return objectMapper.readValue(bytes, Vehicle.class);
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -4,7 +4,7 @@ import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class VehiclesData implements Serializable{
public class Vehicle implements Serializable{
/**
*
......@@ -20,11 +20,11 @@ public class VehiclesData implements Serializable{
private double speed;
private double fuelLevel;
public VehiclesData(){
public Vehicle(){
}
public VehiclesData(String vehicleId, String vehicleType, String routeId, String latitude, String longitude,
public Vehicle(String vehicleId, String vehicleType, String routeId, String latitude, String longitude,
Date timestamp, double speed, double fuelLevel) {
super();
this.vehicleId = vehicleId;
......
......@@ -10,7 +10,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.traffic.kafka.vo.VehiclesData;
import com.traffic.kafka.vo.Vehicle;
import java.util.HashMap;
import java.util.Map;
......@@ -30,12 +30,12 @@ public class TrafficDataSenderConfig {
}
@Bean
public ProducerFactory<String, VehiclesData> producerFactory() {
public ProducerFactory<String, Vehicle> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, VehiclesData> kafkaTemplate() {
public KafkaTemplate<String, Vehicle> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
......@@ -20,23 +20,22 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import com.traffic.kafka.vo.VehiclesData;
import com.traffic.kafka.vo.Vehicle;
@Service
public class TrafficDataSender {
private static final Logger LOG = LoggerFactory.getLogger(TrafficDataSender.class);
@Autowired
private KafkaTemplate<String, VehiclesData> kafkaTemplate;
private KafkaTemplate<String, Vehicle> kafkaTemplate;
@Value("${com.kafka.topic}")
private String topic;
public void send(VehiclesData data){
public void send(Vehicle data){
LOG.info("sending data='{}' to topic='{}'", data, topic);
Message<VehiclesData> message = MessageBuilder
Message<Vehicle> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
......@@ -51,7 +50,7 @@ public class TrafficDataSender {
//logger.info("Sending events");
// generate event in loop
while (true) {
List<VehiclesData> eventList = new ArrayList<VehiclesData>();
List<Vehicle> eventList = new ArrayList<Vehicle>();
for (int i = 0; i < 100; i++) {// create 100 vehicles
String vehicleId = UUID.randomUUID().toString();
String vehicleType = vehicleTypeList.get(rand.nextInt(5));
......@@ -63,12 +62,12 @@ public class TrafficDataSender {
String coords = getCoordinates(routeId);
String latitude = coords.substring(0, coords.indexOf(","));
String longitude = coords.substring(coords.indexOf(",") + 1, coords.length());
VehiclesData event = new VehiclesData(vehicleId, vehicleType, routeId, latitude, longitude, timestamp, speed,fuelLevel);
Vehicle event = new Vehicle(vehicleId, vehicleType, routeId, latitude, longitude, timestamp, speed,fuelLevel);
eventList.add(event);
}
}
Collections.shuffle(eventList);// shuffle for random events
for (VehiclesData event : eventList) {
for (Vehicle event : eventList) {
send(event);
Thread.sleep(rand.nextInt(3000 - 1000) + 1000);//random delay of 1 to 3 seconds
}
......
......@@ -6,11 +6,8 @@ import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class VehiclesData implements Serializable{
public class Vehicle implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String vehicleId;
private String vehicleType;
......@@ -22,11 +19,11 @@ public class VehiclesData implements Serializable{
private double speed;
private double fuelLevel;
public VehiclesData(){
public Vehicle(){
}
public VehiclesData(String vehicleId, String vehicleType, String routeId, String latitude, String longitude,
public Vehicle(String vehicleId, String vehicleType, String routeId, String latitude, String longitude,
Date timestamp, double speed, double fuelLevel) {
super();
this.vehicleId = vehicleId;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment