What is Kafka?
Kafka is an event streaming platform used to collect and process data streams at scale, it was Initially developed at LinkedIn in 2011. It is now open-sourced and part of the Apache Software Foundation. It is a JVM application written in Java and Scala.
Notable Companies Using Kafka:
LinkedIn: Kafka originated at LinkedIn. They use it for tracking operational metrics, monitoring, and event sourcing.
Netflix: They use Kafka for real-time monitoring and event processing.
Uber: Uses Kafka for gathering metrics from its many services spread across its transportation platform.
Spotify: Employs Kafka for tracking user activity and updating playlists in real-time.
Events
Simply put an event is “a thing that happened”, it is a combination of Notification and State. Notable examples include
Internet of Things (If my thermostat reads 80)
User Interaction (User hovers over a black dress)
Microservice output
Business process change
Kafa data model for events is a key-value pair
Event Key: “20393” (Representing a user ID)
- Usually represented as a string or integer
Event Value: “Hovered over a black dress for 3 seconds”
- Usually represented as one of JSON, Avro, Protocol Buffers
*Note that I use the terms event and message interchangeably
Topic
Now we need a way to store all these events…You can think of topics as named containers for similar events. They are really just append-only log files that contain events. All events must be sent to a topic, for example, we can create a topic named “user_metadata”, and send our event to it. Important things to note about topics are
They are immutable, they cannot be deleted or destroyed
Topics are durable, retention period is configurable
They can only seek by offset, not indexed (this is what makes Kafka so fast!)
Producer
A Producer is a client application that publishes messages to Kafka. A producer will set the Key and Value of an event and send to a specific topic.
Consumer
A Consumer is also a client application that reads and processes events from Kafka. A Consumer must subscribe to a topic in order to receive that topic’s events. There can be many consumers for a single topic
Broker
A Broker refers to a single Kafka server instance in a Kafka cluster. The broker is responsible for receiving data (writes) from producers, storing this data, and serving it to consumers for reads. The Broker also manages the offset (or position) for consumer groups
This is all we need to know to get started, however, I would like to quickly mention a couple of other core Kafka components that we will not get to in this project:
Partitions: Segments within a Kafka topic that allow data to be distributed and processed in parallel.
Replicas: Copies of a Kafka partition that provide fault tolerance and data redundancy, ensuring data availability even if a server fails.
Consumer groups: A set of consumers working together to consume and process data from one or multiple topics, ensuring each message is processed once and only once by one member of the group.
Kraft: A way for Kafka to manage its internal organization without relying on an external helper, making it simpler and more self-contained.
Project Time
Today we will create a Go project, simulating a real use case for Kafka. Real-time fraud detection, to get started make sure you have Go installed. Then create a directory where you want your project to live.
mkdir kafka-fraud-detection
Here we will be running Kafka on a Docker instance that can easily be spun up. Use the gist below as the content for your docker-compose.yaml
file.
%[https://gist.github.com/frankie-mur/d1591e3f347a76998de9b1c7d3293d7f]
docker-compose up
Project Structure
Great now we have Kafka running in a docker container and ports forwarded to our local host this will allow us to connect to Kafka, let’s set up the project structure we want to create a couple of folders
mkdir -p cmd/producer cmd/consumer pkg/models pkg/bankaccount
Finally, we want to initialize Go modules and install external packages. We will be using sarama
it to establish a connection to our Kafka broker and faker
to generate bank account creation events.
go mod init kafka-fraud-detection
go get github.com/IBM/sarama github.com/ github.com/bxcodec/faker/v4
Create our Models and Faker Data
Let’s start by setting up our BankAccount model and associated function to generate fake bank account events let’s create a file pkg/models/models.go
package models
type BankAccount struct {
FirstName string `faker:"first_name"`
LastName string `faker:"last_name"`
CreditCardNumber string `faker:"cc_number"`
Amount float64 `faker:"amount"`
}
Great, This is simulating Bank Account data that a consumer will need to verify. Now let’s create that function in /pkg/bankaccount/BankAccount.go
package bankaccount
import (
"fmt"
"github.com/bxcodec/faker/v4"
"github.com/frankie-mur/go-kafka/pkg/models"
)
func GenerateBankAccount() (*models.BankAccount, error) {
bankAccount := models.BankAccount{}
err := faker.FakeData(&bankAccount)
if err != nil {
return nil, fmt.Errorf("failed to generate bank account: %w", err)
}
return &bankAccount, nil
}
This function uses our annotated BankAccount struct and the faker
library to generate fake data and return that data.
Setting up our Producer
Now that we have all that setup let’s get to the fun part, setting up our Producers and Consumers. We will start with our Producer by creating a new file in cmd/producer/producer.go
and creating the function
const (
KafkaServerAddress = "localhost:9092"
KafkaTopic = "bankaccount"
)
func setupProducer() (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{KafkaServerAddress},
config)
if err != nil {
return nil, fmt.Errorf("failed to setup producer: %w", err)
}
return producer, nil
}
Let’s break down this code a bit
Define a function
setupProducer
that creates and configures a Kafka producer client. This function will return aSyncProducer
if it succeeds or anerror
if failsCreate a
sarama.Config
struct to hold producer configuration setProducer.Return.Successes
to true so the producer will return success metadata for each message.Call
sarama.NewSyncProducer
to create a new synchronous Kafka producer, passing the broker address and config. It returns the producer instance and any error from producer creation.
Simple enough, now let’s create our main function that will call the created function above
func main() {
p, err := setupProducer()
if err != nil {
log.Fatalf("Failed to setup producer: %v", err.Error())
}
defer p.Close()
}
Create our Kafka producer and assign it to a variable
p
Handle any errors creating the producer
Defer closing the producer
p
connection unitl the function exits
Now let’s create our message-sending function!
Now we have a producer and can send messages let’s abstract that a bit and create another function that handles sending messages
func sendKafkaMessage(producer sarama.SyncProducer, topic string, message []byte) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
_, _, err := producer.SendMessage(msg)
return err
}
Define a function
sendKafkaMessage
that sends a message to Kafka using a providedproducer
. It accepts the producer, topic name, and message payload as parameters.Create a
sarama.ProducerMessage
struct with the topic name and message payload.use the
producer.SendMessage
method to send the message to Kafka asynchronously.Note: As you can see we only send a value, what about the key? In Kafka sending keys is optional. In our use case, we will omit the key.
Return any error from the send operation.
Now back to our main function to add some code….
func main() {
p, err := setupProducer()
if err != nil {
log.Fatalf("Failed to setup producer: %v", err.Error())
}
defer p.Close()
size := 10
for i := 0; i < size; i++ {
//Generate fake bank account data
bank, err := bankaccount.GenerateBankAccount()
if err != nil {
fmt.Printf("Failed to generate bank account: %v", err.Error())
}
//Convert the bank account data to []byte
data, err := json.Marshal(bank)
if err != nil {
fmt.Printf("Failed to marshal bank account: %v", err.Error())
}
//Send the message
err = sendKafkaMessage(p, KafkaTopic, data)
if err != nil {
fmt.Printf("Failed to send message: %v", err.Error())
}
}
}
Configure a size of 10, to send 10 messages (or events) to Kafka
Generate random fake bank account data using
bankaccount.GenerateBankAccount()
function we createdWe must convert our bank data to
[]byte
to send as a message in kafka so we usejson.Marshal(bank)
to do that for us (basically just converting our struct to stringified json)Finally, we send the message and check for error
Done! Now that we can produce events lets create a Consumer to consume them!
Setting up our Consumer
Similar to our producer.go
we will start with writing a function to connect as a consumer to Kafka. Lets create a new file in cmd/consumer/consumer.go
const (
KafkaServerAddress = "localhost:9092"
KafkaTopic = "bankaccount"
)
func connectConsumer() (sarama.Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// Create new consumer
conn, err := sarama.NewConsumer([]string{KafkaServerAddress}, config)
if err != nil {
return nil, err
}
return conn, nil
}
This code:
Defines constants for the Kafka server address and topic name.
Defines a
connectConsumer
function to create a Kafka consumer client.Creates a sarama.Config with settings:
Consumer.Return.Errors
set to true will return any errors that occurred during the consumption of an eventConsumer.Offsets.Initial
set tosarama.OffsetNewest
, this will set the newly created Consumers offset to be the newest. This will make sure the Consumer only consumes events that happened during or after its creation.
Calls
sarama.NewConsumer
to create the client, passing the broker address and config.Returns the new consumer instance and any error.
Great, now let’s create our main function
func main() {
worker, err := connectConsumer()
if err != nil {
panic(err)
}
// Calling ConsumePartition. It will open one connection per broker
// and share it for all partitions that live on it.
consumer, err := worker.ConsumePartition(KafkaTopic, 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
fmt.Println("Consumer started ")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Get signal for finish
doneCh := make(chan struct{})
}
Some things to point out here:
we call
worker.ConsumePartition()
to start consuming messages from partition 0 of the KafkaTopic, starting from the newest offset.we create sig chan channel to receive OS signals like Ctrl+C. To gracefully shutdown or log errors
Now we need to handle when a consumer receives events
func main() {
...
//spins up a goroutine to fetch messages from Kafka
//and handle them, while also monitoring for shutdown signals
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
//Handle the message received from Kafka
err := handleMessage(msg)
if err != nil {
fmt.Printf("Failed to handle message for consumer with key: %v, with error: %v\n", msg.Key, err)
}
case <-sigchan:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
fmt.Println("Processed all messages, awaiting more...")
if err := worker.Close(); err != nil {
panic(err)
}
}
without going too much into goroutines when our consumer receives an Error or Message it will be caught in our select
statement, let’s define our `handleMessage` function…
func handleMessage(msg *sarama.ConsumerMessage) error {
fmt.Printf("Received message | Topic(%s) | Message(%s) \n", msg.Topic, string(msg.Value))
//Parse the message back into our BankAccount struct
bankAccount := models.BankAccount{}
err := json.Unmarshal(msg.Value, &bankAccount)
if err != nil {
return fmt.Errorf("failed to parse message: %w", err)
}
//Call our fake Veritas API
succeeded := callVeritas(bankAccount)
if !succeeded {
fmt.Printf("--UNAUTHORIZED-- bank account for user %s\n", bankAccount.FirstName)
} else {
fmt.Printf("--AUTHORIZED-- bank account for user %s\n", bankAccount.FirstName)
}
return nil
}
// Simulate calling an actual verification service
// randomly return true or false
func callVeritas(bankAccount models.BankAccount) bool {
fmt.Printf("Calling Veritas for %s bank...\n", bankAccount.FirstName)
return rand.Intn(2) == 0
}
- Notice we Unmarshal
msg.Value
(Remember these are key-value pairs!) to our BankAccount struct. Call our fake Authorization API that we callVertias
, and print out the results.
Finished! We now have both our Producer and our Consumer, let’s use them.
Let’s start with running our consumer, go to the root of the project and run the command in your terminal
Our consumer is now running and listening for events of its subscribed topic…in our case the topic bankaccount
.
Now let’s send some events! Open a new terminal (or split) and once again go to the root of the project and run the command
Did you see that?! If you were watching the Consumer terminal you would see something similar to this
We did it! If you were following along here is a quick overview of the entire flow
Our producer sent an event to Kafka with the topic
bankaccount
and the value{"FirstName":"Jimmie","LastName":"Swaniawski","CreditCardNumber":"3558018114425614","Amount":644.78}
Our consumer is subscribed to the topic
bankaccount
, therefore it receives our messageThe message is Unmarshalled and sent to our (fake) Authorization service Veritas.
It is Verified (or in our case Unauthorized) and the result is printed to the console
source code: https://github.com/frankie-mur/go-kafka-fraud-detection/tree/main