A simple streaming pipeline for GPS data

A simple streaming pipeline for GPS data

We can all agree that the name “Kafka” can be intimidating, but it’s no big deal.

Data pipelines can happen in two different ways : Batch or Stream. Batch pipelines process the data by chunk, and they run periodically in general. Stream pipelines on the other hand process the data as it comes and the outputs are generated in real time.

In most cases, batch processing is enough and it can much cheaper than stream processing, but we can’t wait for the steaming option to be justified to start learning it.

In this article you’ll get introduced to Apache Kafka and its main concepts, and you’ll follow the design and implementation of a practical use case.


Series of Curious Questions about Kafka

https://media.giphy.com/media/5XRB3Ay93FZw4/giphy.gif

This section will contain the answers to the questions you should be asking yourself about Kafka. This will give you the necessary knowledge to understand the second section.

What’s Apache Kafka ?

  • Kafka is an event streaming platform where you publish \ subscribe to streams, store streams, query streams and process streams as they go.
  • It was developed by LinkedIn and It’s written in Java & Scala.

How distributed is it ?

  • Kafka is highly scalable, since its storage layer can be distributed and replicated over multiple servers (called brokers) to ensure reliability and performance.
  • Brokers are managed by Zookeeper which is responsible of identifying the leader\slaves of the cluster.
  • Zookeeper is a strong dependency of Kafka for now, but it will soon detach from it in upcoming upgrades.

How can I read from \ write to Kafka ?

  • You can Kafka receives data from producers and sends data to consumers.
  • You can easily write your producers\consumers in any language you like.
  • Even components like Kafka Connect & Kafka Streams are behaving under the abstraction of Producer \ Consumer

Where does the data live exactly ? Topics.

  • The data you send or read from Kafka lives in Kafka Topics
  • Each Producer\Consumer can interact with one or more topics
  • Kafka Topics can be thought of as queues that can still be queried even after consumption.
  • Topics can be partitioned and replicated.
  • Topics can send ACK ( -nowledgments ) to tell ensure delivery of the data

Can I integrate Kafka with other systems ? Kafka Connect

  • Kafka Connect is literally connecting Kafka to other systems ( eg. databases ).
  • Since it’s a pretty common use case, many Connectors for many known systems are available on Confluent’s website
  • Sink Connectors act as consumers, they read from Kafka and write in the target system.
  • Source Connectors act as producers, they read from source systems and write to Kafka.
  • In practice, little coding is needed here, you only have to worry about the configuration of the connector. Unless you want to write your custom connector.

Example : Streaming real time GPS data

https://media.giphy.com/media/Dk57URqjqjHjNGHeMV/giphy.gif

Step 0 : Let’s find purpose for what we are doing

It’s more fun to work on things with a purpose rather than working for the sake of practicing alone. So let’s create a problem to solve.

Imagine you have a food delivery company, and you want to watch your drivers’ mobility in real time on a dashboard. This can help you, as a business owner :
- monitor some of the main KPIs of your business in real time.
- analyze traffic data to optimize delivery paths
- change drivers distribution to handle high demand

We’ll need to respect the following constraints :
- Data must be collected from multiple phones
- Data should be available for real time processing even after storage
- Data should be stored in a database where it can be accessed by the team

Now that we have a clear purpose and usability for what we are building. Let’ design a solution for this.

Step 1 : Prepare the infrastructure

We’ll respond to the stated constraints with the following design decisions :

  • Since the “real time” constraint is very important in our case here, Kafka will be the center of our architecture. It will be the place where everything pushes and reads data.
  • For the link between Kafka and the Database, we’ll use Kafka Connect because it will definitely be more reliable than anything we build from scratch and will only require some quick configuration.
  • We’ll use MongoDb as a database since it’s more intuitive to work with and can scale easily to handle heavy writes. It’s also convenient since our data doesn’t have a schema we must enforce.
  • For the GPS data, we’ll fake it using Lockito and we’ll send it from the phone using Sensorstream that relies on a UDP protocol on WLAN. If we had a real app, I think we could use websockets for this.
  • The producer will act as a client and will listen on the specified port to receive the data from the phones, process it and send it to Kafka.
architecture.png

Step 2 : Connecting Kafka and MongoDb

In order to connect Kafka with MongoDb we’ll use a ready to use Connector provided by MongoDb and Confluent. Basically, it’s a .jar file that’s integrated with the base Kafka Connect, it acts as a plugin.

Confluent provides a baseline Kafka Connect image in docker, we’ll create another image that’s based on top of it and includes the MongoDb connector. The docker file will look like the following.

FROM confluentinc/cp-kafka-connect:latest

RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:latest

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

The Connect image will run along with Kafka, MongoDb. The whole infrastructure will be configured in a docker-compose file which you can find in the github repository. The docker-compose will setup the Connect image in a way that it’s aware of Kafka and connected to it.

Once our infrastructure is up and running we’ll query the Kafka Connect REST API to create a Sink connection between Kafka and our Mongodb. This can happen using a CURL command but I prefer Python scripts.

The following script is telling Kafka Connect to create a Sink Connection named “mongo-sink” with a configuration that specifies where the database resides and which topic should poor into it. We could specify the behavior of the sink further (eg. rate limiting, preprocessing ) but we wont in this example.

payload = {
          "name": "mongo-sink",
          "config": {
             "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
             "connection.uri":"mongodb://USERNAME:PASSWORD@HOST:27017",
             "database":"kafka_topics",
             "collection":"traffic",
             "topics":"traffic",
             
             "value.converter": "org.apache.kafka.connect.storage.StringConverter",
             "value.converter.schemas.enable": "true"
              
	           }
         }

url = "<http://localhost:8083/connectors>"

if __name__ == '__main__':
    r = requests.post( url, 
												json=payload, 
												headers={"Content-Type": "application/json" })
    print(r)
    print(r.content)

After this step, everything must be ready to handle our data flow smoothly.

Step 3 : Receiving data from Phone

As we mentioned before, we are going to test our infrastructure by using fake GPS data generated by Lockito and sent by Sensorstream. I wont get into this but it’s straightforward if you follow these steps :

  1. install both apps ( sorry apple users )
  2. connect your phone to the same Wifi as your PC
  3. define a path on Lockito and run it. It will cause your GPS coordinates to change.
  4. enter the IP address of your PC in Sensorstream. The IP can be found using the command “ipconfig” on Windows )
  5. Run the stream.

After completing these steps, your phone must be sending data to your laptop. We now need to catch that data.

We are going to write a Kafka Producer that listens to the stream of data and pushes it directly to our Kafka Topic that’s connected with the database.  The code will look like this.

class UdpReceiver:
    def __init__(self, port=5555):
        self.UDP_IP = ""
        self.UDP_PORT = port
        self.sock = socket.socket(socket.AF_INET, # Internet
                          socket.SOCK_DGRAM) # UDP
        self.sock.bind((self.UDP_IP, self.UDP_PORT))
        
        # Kafka producer
        self.producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda x: json.dumps(x).encode("utf-8"))
        
    
    
	# By default, it will listen for 5 minutes and stop
    def run( self, duration_s=5*60):
        start = datetime.datetime.now()
        end = datetime.datetime.now()
        while (end-start).seconds < duration_s:
            data, addr = self.sock.recvfrom(1024)
            end = datetime.datetime.now()
            d = {}
            d['address'] = addr
            d['payload'] = parse_payload(data)
            d['time'] = end.isoformat() 
            
            print(d['payload'])
            self.producer.send('traffic', d)
            
    
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.sock.close()

        
if __name__ == "__main__":
    udp_receiver = UdpReceiver()
    udp_receiver.run()

Congrats 🎉🎉🎉 !! Our real time data pipeline is up and running. You can test it with multiple phones, and try visualizing data on a dashboard to monitor your lazy ass drivers.

https://media.giphy.com/media/LZElUsjl1Bu6c/giphy.gif

Action

I hope this article will add value to your career as a data professional. Feel free to read more on the blog and share with your friends.

Thank you for your time.

You can find the code in this repo.