Complete Guide to Kafka: Event-Driven Architecture, Serialization, Streams, and More
Event Driven Architecture
Event-Driven Architecture (EDA) is a software architecture pattern that emphasizes the production, detection, consumption, and reaction to events. In EDA, systems communicate and interact through events, which represent significant occurrences or changes in the system or its environment. Events can be anything of interest to the system, such as user actions, sensor readings, database updates, or external notifications.
Benefits:
- Decoupling: EDA promotes loose coupling between components, as they communicate through events. This allows for flexibility in system evolution, as components can be added, modified, or replaced without affecting others.
- Encapsulation: Events encapsulate information about a specific occurrence, making it easier to understand and handle the system's behavior.
- Optimization: EDA enables the optimization of individual components independently, as they can process events asynchronously and at their own pace.
- Scalability: EDA facilitates horizontal scalability by distributing events across multiple consumers or subscribers, enabling efficient handling of high event volumes.
Drawbacks:
- Steep Learning Curve: EDA introduces new concepts and design principles, which can require a learning curve for developers unfamiliar with the pattern.
- Complexity in Maintainability: Asynchronous communication and event-driven systems can introduce complexity in terms of debugging, testing, and maintaining the overall system.
- Loss of Transactionality: EDA often relies on eventual consistency instead of immediate consistency, which means handling failures or ensuring atomicity across multiple events can be challenging.
- Lineage: Tracking the full lineage or history of an entity's state can be more complex in an event-driven system compared to a traditional request/response architecture.
EDA is well-suited for certain scenarios, such as:
- Highly distributed systems: EDA enables communication and coordination between loosely coupled components across different locations or domains.
- Complex business processes: EDA allows for flexibility and adaptability in modeling and implementing complex business workflows.
- Event-driven domains: Domains that naturally produce a significant number of events, such as IoT, real-time analytics, financial systems, and social networks.
Examples of enterprise architectures leveraging EDA include:
- Financial systems: Trading platforms, fraud detection systems, and payment processing systems can benefit from EDA to handle high volumes of events and enable real-time processing.
- IoT systems: Smart homes, industrial automation, and asset tracking systems often rely on EDA to handle events from various sensors and devices.
- Microservices architectures: EDA fits well within microservices-based systems, where events serve as the means of communication and coordination between services.
Messages, Events, and Commands: In an event-driven system, different types of communication patterns are used:
- Events: Events represent the occurrence of something significant in the system. They are usually immutable and communicate what has happened in the past. Events are often used to notify interested parties about changes or trigger further actions.
- Commands: Commands are requests for a specific action to be performed. They are typically mutable and represent an intention or instruction to carry out a specific task.
- Messages: Messages are a broader category that includes both events and commands. They are the units of communication between components in an event-driven system.
Event Storming: Event Storming is a collaborative workshop technique used to explore, analyze, and design event-driven systems. It involves gathering stakeholders, domain experts, and developers to create a shared understanding of the business processes and events within a system. Event Storming sessions typically utilize sticky notes on a wall to represent events, commands, and actors, facilitating discussions and identifying potential challenges or opportunities within the system.
Kafka
Kafka is an open-source distributed streaming platform designed to handle high-volume, real-time data streams. It provides a highly scalable, fault-tolerant, and durable architecture for building robust data pipelines, event-driven systems, and real-time data processing applications.
Reasons for choosing Kafka include:
- Scalability: Kafka is designed to handle high-throughput and high-volume data streams, making it suitable for use cases with large data volumes or high message rates.
- Durability: Kafka stores data in a fault-tolerant and durable manner, allowing data to be reliably consumed even in the face of failures.
- Real-time Processing: Kafka enables real-time data processing by allowing producers and consumers to work with streams of data as they arrive, enabling near real-time analytics, monitoring, and event-driven architectures.
- Flexibility: Kafka provides a publish-subscribe messaging model, which allows data to be consumed by multiple subscribers in real-time, enabling decoupled and scalable systems.
Messaging System: Kafka serves as a distributed messaging system, where producers write messages to topics, and consumers read those messages. Messages in Kafka are organized into topics, which act as logical channels or categories for the published data. Producers and consumers communicate with Kafka asynchronously, decoupling data producers from data consumers.
Distributed Storage: Kafka utilizes a distributed storage model, allowing data to be replicated and distributed across multiple servers or nodes in a Kafka cluster. This approach ensures fault tolerance, high availability, and scalability by allowing data to be stored across multiple machines.
Data Processing: Kafka provides support for stream processing, allowing developers to build real-time data processing applications. Streams of data can be processed using Kafka Streams, a client library that provides an API for building scalable and fault-tolerant stream processing applications.
Kafka Broker, Clusters, ZooKeeper: A Kafka cluster consists of multiple Kafka brokers, which are individual instances of Kafka running on different machines. Each broker is responsible for storing and serving a portion of the data in the Kafka topic. ZooKeeper is used for coordinating and managing the Kafka cluster, maintaining metadata, leader election, and managing consumer offsets.
Kafka Record (Key, Value, Timestamp): A Kafka record is the basic unit of data in Kafka. It consists of a key, a value, and a timestamp. The key and value are byte arrays, allowing for flexible serialization of data. The key is optional and can be used for partitioning purposes, while the value carries the actual payload of the message. The timestamp represents the time at which the record was produced.
Kafka Topics: Kafka topics are the channels or categories to which messages are published. They represent a particular stream of records. Topics can be partitioned to enable parallelism and scalability. Important concepts related to Kafka topics include:
- Deletion: Topics can be deleted, removing all associated data.
- Retention: Kafka retains messages in topics for a configurable period or size, allowing consumers to fetch historical data.
- Compaction: Kafka supports log compaction, which ensures that only the latest value for each key is retained in a topic.
- Partitions: Topics can be divided into multiple partitions, allowing for parallel processing and horizontal scalability.
- Replicated Partitions: Kafka provides replication of partitions across multiple brokers for fault tolerance and high availability.
Kafka Producer: The Kafka producer is responsible for publishing messages to Kafka topics. It writes messages to specific topics, which are then made available for consumption by consumers. Producers can choose to send messages with or without a key, and they can specify the desired topic for each message.
Key and Value Serializers: Producers need to specify serializers that convert the key and value objects into byte arrays for transmission over the network. Kafka provides various serializer options, such as StringSerializer, ByteArraySerializer, or custom serializers for specific data formats.
Java Example of Producer:
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092"); //
Bootstrap servers
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new
ProducerRecord<>("my_topic", "my_key", "my_value");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception)
{
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to
partition " + metadata.partition() + ", offset " + metadata.offset());
}
}
});
producer.close();
Kafka Consumer: Kafka consumers read and process messages from Kafka topics. They subscribe to one or more topics and consume records published to those topics. Consumers can be part of a consumer group, which allows for parallel processing of messages.
Java Example of Consumer:
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092"); //
Bootstrap servers
props.put("group.id", "my_consumer_group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my_topic"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: key=" +
record.key() + ", value=" + record.value());
}
}
consumer.close();
Protocol Stack and Clients: Kafka uses a TCP/IP-based protocol stack for communication between producers, consumers, and brokers. The Kafka clients provide libraries and APIs in various programming languages, including Java, Python, and others, to interact with Kafka clusters. These clients enable developers to build Kafka producers, consumers, and stream processing applications.
Setup Kafka
To start up Kafka, you'll need to follow several steps, including setting up the necessary environment, starting the ZooKeeper server, and starting the Kafka broker(s). Here's a high-level explanation of the steps involved in starting Kafka:
Prerequisites: Ensure that you have Java installed on your system, as Kafka requires Java to run. You can check your Java installation by running the java -version command in your terminal.
Download Kafka: Go to the Apache Kafka website (https://kafka.apache.org/downloads) and download the latest stable version of Kafka. Extract the downloaded archive to a directory of your choice.
Start ZooKeeper: ZooKeeper is a centralized service used by Kafka for maintaining configuration data, managing distributed coordination, and leader election. Kafka depends on ZooKeeper for its operations. Start ZooKeeper by running the following command in a terminal: bin/zookeeper-server-start.sh config/zookeeper.properties
This will start the ZooKeeper server on the default port 2181.
Configure Kafka:
Navigate to the Kafka directory and modify the server configuration file (config/server.properties) based on your requirements. You can adjust settings such as port numbers, log directories, replication factors, and more.
Start Kafka Broker(s):
To start the Kafka broker, run the following command in a new terminal:
bin/kafka-server-start.sh config/server.properties
This will start a single Kafka broker using the configuration specified in server.properties.
Verify Kafka Startup: Check the terminal where the Kafka broker was started to ensure that there are no errors. The broker will output logs indicating its successful startup.
Kafka is now up and running, and you can start using it for producing and consuming data streams.
Create Topics: To create Kafka topics, use the kafka-topics.sh script provided with Kafka. For example, to create a topic named "my_topic" with three partitions and a replication factor of one, run the following command:
bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 1
--bootstrap-server localhost:9092
You can adjust the topic name, partition count, and replication factor as per your requirements.
Serialization Formats
Serialization formats are formats or protocols that define how data is structured, encoded, and transformed into a format suitable for transmission or storage. They facilitate the conversion of complex data structures or objects into a portable and compact representation that can be easily transmitted across networks or persisted in storage systems.
Serialization is the process of converting objects or data structures into a byte stream or a textual representation that can be easily stored, transmitted, or reconstructed. Deserialization is the reverse process of reconstructing the original object or data structure from the serialized form.
Serialization formats provide a standardized way of encoding data, ensuring that data can be correctly serialized and deserialized across different systems, platforms, and programming languages. They define rules for representing data types, structures, relationships, and other relevant metadata.
Common serialization formats include:
- Binary Formats: Binary serialization formats represent data in binary form, which is more compact and efficient for storage and transmission. Examples include Apache Avro, Protocol Buffers (Protobuf), Apache Thrift, and MessagePack.
- Text-Based Formats: Text-based serialization formats encode data using human-readable text, making them easier to inspect and understand. Examples include JSON (JavaScript Object Notation), XML (eXtensible Markup Language), and YAML (YAML Ain't Markup Language).
- Specialized Formats: There are also specialized serialization formats designed for specific use cases, such as image formats (JPEG, PNG), document formats (PDF, DOCX), and geospatial formats (GeoJSON, Shapefile).
Serialization formats often provide features such as:
- Data Type Representation: Defining how different data types (strings, numbers, booleans, dates, etc.) are represented in the serialized form.
- Schema Definition: Allowing the definition and enforcement of schemas that describe the structure and validation rules for serialized data.
- Versioning and Evolution: Supporting schema evolution by allowing the introduction of changes to data structures while maintaining backward or forward compatibility.
- Compression: Enabling compression of serialized data to reduce storage or transmission size.
- Interoperability: Allowing data to be serialized and deserialized across different platforms, systems, or programming languages.
The choice of serialization format depends on factors such as the specific use case, performance requirements, interoperability needs, ecosystem support, and compatibility with existing systems.
AVRO
AVRO is a data serialization framework developed by Apache. It provides a compact, efficient, and schema-based binary data format. AVRO supports rich data structures, dynamic typing, and schema evolution. It is commonly used in Big Data processing frameworks like Apache Hadoop, Apache Spark, and Apache Kafka.
AVRO uses schemas defined in JSON format to describe the structure of data. An AVRO schema defines the fields, data types, and nesting structure of a record.
AVRO supports primitive data types such as null, boolean, int, long, float, double, bytes, and string.
AVRO also supports complex data types, including record (structured objects with named fields), enum (enumerated types), array (ordered collection of elements), map (key-value pairs), union (multiple possible types for a field), and fixed (fixed-length binary data).
Protobuf
Protocol Buffers, also known as Protobuf, is a language-agnostic binary serialization format developed by Google. It is designed for efficient data storage and transmission, especially in high-performance and bandwidth-constrained environments. Protobuf uses a language-specific schema definition to generate code for data serialization and deserialization.
Apache Thrift
Thrift is a cross-language serialization and RPC (Remote Procedure Call) framework developed by Apache. It allows efficient communication between systems written in different programming languages. Thrift uses a domain-specific language (IDL) to define data types and services, and it generates code for serialization, deserialization, and remote method invocation.
Both Protobuf and Thrift offer similar features, including schema evolution, efficient binary encoding, and support for multiple programming languages. They provide a compact binary format, making them suitable for high-performance and cross-platform communication.
Each serialization format has its own strengths and use cases. The choice of format depends on factors such as compatibility requirements, performance considerations, ecosystem support, and integration with existing systems.
It's important to note that AVRO, Protobuf, and Thrift are just a few examples of serialization formats. There are other popular formats like JSON, XML, and MessagePack, each with its own characteristics and usage scenarios.
Schema Registry
Schema Registry is a component that works in conjunction with Apache Kafka and provides a centralized schema management system. It allows you to store, retrieve, and manage schemas for data serialization in a distributed streaming environment.
Subject Name Strategy
The Subject Name Strategy in Schema Registry determines how subjects (topics) are named and managed. A subject represents the schema for a specific topic in Kafka. The subject name strategy defines the naming convention for subjects when registering schemas.
Some commonly used strategies are:
- Topic Name Strategy: The subject name is derived from the Kafka topic name. Each topic has its own subject, and schemas are registered under their corresponding topics.
- Record Name Strategy: The subject name is based on the name of the record defined in the schema. This strategy allows multiple topics to share the same schema if they have the same record name.
- The choice of subject name strategy depends on the specific requirements of your data model and how you want to manage schemas for your topics.
Schema Distribution
Schema Registry provides a centralized location for storing and distributing schemas. It allows producers and consumers to fetch and validate schemas during serialization and deserialization.
When a producer wants to send data to a Kafka topic, it retrieves the schema for the topic from the Schema Registry. It then serializes the data according to the schema and publishes it to the topic. Consumers, on the other hand, fetch the schema from the Schema Registry and use it to deserialize the data received from Kafka.
Schema Registry supports schema versioning and evolution. It allows you to register multiple versions of a schema for a subject, ensuring compatibility with older data while accommodating schema changes. Consumers can specify the desired version of the schema they want to use during deserialization.
Schema Registry also provides compatibility checks to ensure that schema changes are backward or forward compatible, allowing for seamless data evolution across different versions of schemas.
Schema Registry integrates with different serializers and data formats, such as AVRO, JSON, Protobuf, and more, enabling seamless interoperability between different data producers and consumers in a Kafka ecosystem.
By centralizing schema management, Schema Registry simplifies schema governance, ensures data compatibility, and facilitates data integration and interoperability in distributed streaming applications.
Kafka Streams
Kafka Streams is a lightweight Java library provided by Apache Kafka for building real-time stream processing applications. It allows developers to process and analyze continuous streams of data in a scalable and fault-tolerant manner using Kafka as the underlying data storage and messaging system.
Kafka Streams provides a high-level API for building stream processing applications. It enables developers to consume input streams from Kafka topics, perform various operations on the data, and produce output streams to new Kafka topics. Kafka Streams handles the complexities of distributed processing, fault tolerance, state management, and event time processing.
Kafka Streams supports a variety of stream processing operations, including:
- Filter: It allows you to filter records from the input stream based on specified conditions.
- Map: It enables transforming each record in the input stream into a new record with a modified value.
- Count: It counts the number of records in the input stream.
Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"my-stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input-topic");
KStream<String, String> filteredStream = input.filter((key, value) ->
value.startsWith("A"));
KStream<String, String> mappedStream = filteredStream.mapValues(value ->
value.toUpperCase());
KTable<String, Long> countTable = mappedStream.groupByKey().count();
countTable.toStream().foreach((key, value) -> System.out.println(key + ":
" + value));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Stream vs Table
In Kafka Streams, there are two primary data abstractions: streams and tables.
- Stream: A stream represents an unbounded sequence of events. It is a continuously updating data stream where new events are appended over time. Streams are suitable for operations such as filtering, mapping, aggregation, and windowing.
- Table: A table is a continuously updated view of the latest state of a stream. It represents a snapshot of the stream data at a particular time. Tables are suitable for operations such as joins, aggregations, and lookups.
When working with streams, topics are typically configured for deletion retention, while when working with tables, topics are configured for compaction retention to maintain the latest value for each key.
Stateless and Stateful Processors
In Kafka Streams, there are two types of processors:
- Stateless Processors: These processors do not maintain any internal state. They operate on individual events independently and produce results solely based on the current event being processed. Stateless operations include branch, filter, inverse filter, map, flatMap, foreach, peek, groupBy, and merge.
- Stateful Processors: These processors maintain internal state and produce results based on the accumulated state over a period of time or across multiple events. Stateful operations include aggregations, count, joins, windowing, and custom processors. Stateful processors store their state in the local state store or distributed state stores for fault tolerance and scalability.
Stateful processors in Kafka Streams leverage the internal changelog topics to store and maintain the state.
Stateless Operations
branch: The branch operation allows you to create multiple output streams based on specified conditions. Each condition represents a predicate, and records are routed to the corresponding output stream based on the condition.
KStream<String, String>[] branches = input.branch(
(key, value) -> value.startsWith("A"),
(key, value) -> value.startsWith("B"),
(key, value) -> value.startsWith("C")
);
filter: The filter operation allows you to selectively filter records from the input stream based on a condition.
KStream<String, String> filteredStream = input.filter((key, value) ->
value.contains("important"));
inverse filter: The inverse filter operation filters out records that do not match the specified condition.
KStream<String, String> inverseFilteredStream = input.filterNot((key, value)
-> value.contains("unimportant"));
map: The map operation allows you to transform each record in the input stream into a new record with a modified value.
KStream<String, String> mappedStream = input.mapValues(value ->
value.toUpperCase());
flatMap: The flatMap operation enables you to transform each input record into zero or more output records, expanding or splitting the records based on the transformation logic.
KStream<String, String> flattenedStream = input.flatMapValues(value ->
Arrays.asList(value.split("\\s+")));
foreach: The foreach operation applies a function to each record in the stream without modifying the stream itself. It is typically used for performing side effects, such as logging or sending data to an external system.
input.foreach((key, value) -> System.out.println("Record: " + key +
", " + value));
peek: The peek operation allows you to perform an action on each record in the stream while forwarding the records to the downstream processors or sinks. It is useful for debugging or monitoring purposes.
KStream<String, String> peekStream = input.peek((key, value) ->
System.out.println("Processing record: " + key + ", " + value));
groupBy: The groupBy operation groups records in the input stream based on a specific key and produces a new stream with the grouped records.
KGroupedStream<String, String> groupedStream = input.groupBy((key, value) ->
key);
merge: The merge operation combines multiple input streams into a single output stream by merging the records from each input stream.
KStream<String, String> mergedStream =
inputStream1.merge(inputStream2).merge(inputStream3);
Stateful Operations
aggregations: Aggregations allow you to compute aggregated results based on key-grouped records in a stream. Examples of aggregations include sum, average, minimum, maximum, etc.
KTable<String, Integer> aggregatedTable = input.groupByKey()
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value,
Materialized.as("aggregated-table")
);
count: The count operation counts the number of records in the input stream.
KTable<String, Long> countTable = input.groupByKey().count();
joins: Joins allow you to combine records from multiple streams based on a common key.
KStream<String, String> joinedStream = stream1.join(stream2,
(leftValue, rightValue) -> leftValue + "-" + rightValue,
JoinWindows.of(Duration.ofMinutes(5))
);
windowing: Windowing allows you to group records in a stream based on time intervals or specific event counts.
KStream<Windowed<String>, Long> windowedStream =
input.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.count();
custom processors:
Custom processors allow you to define and apply your own processing logic by implementing the Processor or Transformer interface.
class MyProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
// Custom processing logic
String processedValue = value + "-processed";
context.forward(key, processedValue);
}
@Override
public void close() {
// Clean-up logic
}
}
KStream<String, String> processedStream = input.process(() -> new
MyProcessor());
KSQL
KSQL is an open-source streaming SQL engine for Apache Kafka. It allows you to process, analyze, and transform data in real-time using familiar SQL syntax. With KSQL, you can write SQL queries to create streams and tables, apply filtering and transformations, perform aggregations, and more.
DDL (Data Definition Language) Operations
CREATE STREAM: The CREATE STREAM statement is used to define a new stream in KSQL. It specifies the schema and configuration details for the stream.
CREATE STREAM my_stream (id INT, name STRING) WITH (KAFKA_TOPIC='my_topic',
VALUE_FORMAT='AVRO');
DROP STREAM: The DROP STREAM statement is used to remove an existing stream from KSQL.
DROP STREAM my_stream;
CREATE TABLE: The CREATE TABLE statement is used to define a new table in KSQL. It specifies the schema and configuration details for the table.
CREATE TABLE my_table (id INT, name STRING) WITH (KAFKA_TOPIC='my_topic',
VALUE_FORMAT='AVRO', KEY='id');
DROP TABLE: The DROP TABLE statement is used to remove an existing table from KSQL.
DROP TABLE my_table;
CREATE TABLE AS SELECT: The CREATE TABLE AS SELECT statement creates a new table based on the result of a SELECT query. It allows you to materialize the output of a query as a new table.
CREATE TABLE my_new_table AS SELECT id, name FROM my_stream;
DML (Data Manipulation Language) Operations
SELECT: The SELECT statement is used to query data from streams or tables in KSQL. It allows you to perform filtering, transformations, aggregations, and joins on the data.
SELECT * FROM my_stream WHERE id > 100;
INSERT INTO: The INSERT INTO statement is used to insert data into a target stream or table in KSQL.
INSERT INTO my_stream (id, name) VALUES (1, 'John'), (2,
'Jane');