Built for realtime: Big data messaging with Apache Kafka, Part 1

Build a continuous big data messaging system with Apache Kafka

big data messaging system / information architecture / mosaic infrastructure
Stinging Eyes (CC BY-SA 2.0)

When the big data movement started it was mostly focused on batch processing. Distributed data storage and querying tools like MapReduce, Hive, and Pig were all designed to process data in batches rather than continuously. Businesses would run multiple jobs every night to extract data from a database, then analyze, transform, and eventually store the data. More recently enterprises have discovered the power of analyzing and processing data and events as they happen, not just once every few hours. Most traditional messaging systems don't scale up to handle big data in realtime, however. So engineers at LinkedIn built and open-sourced Apache Kafka: a distributed messaging framework that meets the demands of big data by scaling on commodity hardware.

Over the past few years, Apache Kafka has emerged to solve a variety of use cases. In the simplest case, it could be a simple buffer for storing application logs. Combined with a technology like Spark Streaming, it can be used to track data changes and take action on that data before saving it to a final destination. Kafka's predictive mode makes it a powerful tool for detecting fraud, such as checking the validity of a credit card transaction when it happens, and not waiting for batch processing hours later.

This two-part tutorial introduces Kafka, starting with how to install and run it in your development environment. You'll get an overview of Kafka's architecture, followed by an introduction to developing an out-of-the-box Apache Kafka messaging system. Finally, you'll build a custom producer/consumer application that sends and consumes messages via a Kafka server. In the second half of the tutorial you'll learn how to partition and group messages, and how to control which messages a Kafka consumer will consume.

What is Apache Kafka?

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It's designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn't support JMS, Java's message-oriented middleware API.

Apache Kafka's architecture

Before we explore Kafka's architecture, you should know its basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.
Figure 1: Kafka's architecture

Figure 1. Architecture of a Kafka message system

Apache Kafka's architecture is very simple, which can result in better performance and throughput in some systems. Every topic in Kafka is like a simple log file. When a producer publishes a message, the Kafka server appends it to the end of the log file for its given topic. The server also assigns an offset, which is a number used to permanently identify each message. As the number of messages grows, the value of each offset increases; for example if the producer publishes three messages the first one might get an offset of 1, the second an offset of 2, and the third an offset of 3.

When the Kafka consumer first starts, it will send a pull request to the server, asking to retrieve any messages for a particular topic with an offset value higher than 0. The server will check the log file for that topic and return the three new messages. The consumer will process the messages, then send a request for messages with an offset higher than 3, and so on.

In Kafka, the client is responsible for remembering the offset count and retrieving messages.The Kafka server doesn't track or manage message consumption. By default, a Kafka server will keep a message for seven days. A background thread in the server checks and deletes messages that are seven days or older. A consumer can access messages as long as they are on the server. It can read a message multiple times, and even read messages in reverse order of receipt. But if the consumer fails to retrieve the message before the seven days are up, it will miss that message.

Apache Kafka quick setup and demo

We'll build a custom application in this tutorial, but let's start by installing and testing a Kafka instance with an out-of-the-box producer and consumer.

  1. Visit the Kafka download page to install the most recent version (0.9 as of this writing).
  2. Extract the binaries into a software/kafka folder. For the current version it's software/kafka_2.11-0.9.0.0.
  3. Change your current directory to point to the new folder.
  4. Start the Zookeeper server by executing the command: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Start the Kafka server by executing: bin/kafka-server-start.sh config/server.properties.
  6. Create a test topic that you can use for testing: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Start a simple console consumer that can consume messages published to a given topic, such as javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Start up a simple producer console that can publish messages to the test topic: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Try typing one or two messages into the producer console. Your messages should show in the consumer console.

Example application with Apache Kafka

You've seen how Apache Kafka works out of the box. Next, let's develop a custom producer/consumer application. The producer will retrieve user input from the console and send each new line as a message to a Kafka server. The consumer will retrieve messages for a given topic and print them to the console. The producer and consumer components in this case are your own implementations of kafka-console-producer.sh and kafka-console-consumer.sh.

Let's start by creating a Producer.java class. This client class contains logic to read user input from the console and send that input as a message to the Kafka server.

We configure the producer by creating an object from the java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer


        public class Producer {
          private static Scanner in;
          public static void main(String[] argv)throws Exception {
              if (argv.length != 1) {
                  System.err.println("Please specify 1 parameters ");
                  System.exit(-1);
              }
              String topicName = argv[0];
              in = new Scanner(System.in);
              System.out.println("Enter message(type exit to quit)");

              //Configure the Producer
              Properties configProperties = new Properties();
              configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
              configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
              configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

              org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
              String line = in.nextLine();
              while(!line.equals("exit")) {
                  ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
                  producer.send(rec);
                  line = in.nextLine();
              }
              in.close();
              producer.close();
          }
        }
      

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

In the case of the example application, we know the producer is using ByteArraySerializer for the key and StringSerializer for the value. On the client side we therefore need to use org.apache.kafka.common.serialization.ByteArrayDeserializer for the key and org.apache.kafka.common.serialization.StringDeserializer for the value. Setting those classes as values for KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG will enable the consumer to deserialize byte[] encoded types sent by the producer.

Finally, we need to set the value of the GROUP_ID_CONFIG. This should be a group name in string format. I'll explain more about this config in a minute. For now, just look at the Kafka consumer with the four mandatory properties set:

1 2 Page 1
Page 1 of 2