Kafka – Part 3 – Stream service in Pega
In this blog article, we will see how Pega uses Kafka internally to process streams of data and also an overview of streams service.
This tutorial is implemented using Pega personal edition 8.4, but the core concepts remain the same in higher versions as well
It is recommended to go through previous blog articles on Kafka fundamentals before proceeding here.
What is stream service?
– Stream service was introduced in Pega 7.4.
– Stream service is built on Apache Kafka which helps in publish and subscribe streams of data, store stream of data and processing the same data in real-time.
As we saw from the previous introduction article, to make use of Kafka service, you need the Kafka server and zookeeper server up and running.
How do you start the Kafka and Zookeeper in Pega?
Starting a stream service is responsible to start the internal Kafka and zookeeper.
So, how do you start the stream service?
First, let’s talk about node classification. You can classify the nodes or servers based on its usage.
Node type = stream; This node type is responsible for Kafka-based stream processing.
I recommend you to go through the below article for more information on node classification.
1. Stream service gets automatically started whenever a server (node) of type stream is started.
Note: In the earlier Pega 8 versions, if the existing nodes did not leave or failed to join the Kafka cluster, then server restart will not help in starting the stream service automatically.
We know that, in Pega personal edition, node type is configured as ‘WebUser, BackgroundProcessing, Search,Stream.
So by default, restarting (or starting) the server should automatically start the stream service.
Restart the server. You will see a lot of log entries that are related to Kafka server start-up.
Only one Pega personal edition server joined the Kafka cluster as a broker (also the leader). Replication factor is also 1, because the personal edition always comes with one server.
The Kafka configurations and data are available in the below folders.
To check the server properties, switch to the Kafka server folder inside the personal edition Tomcat folder
C:PRPCPersonalEditiontomcatkafka-1.1.0.4config
There you will see all the default configurations.
Important note: All these server, zookeeper properties file are auto-generated on starting the stream service.
Click and open the server.properties file.
You will see all the server configuration details here.
You may get a question, how to override the settings?
Remember, Pega generates these files automatically when you start the stream service. So it is pretty clear that updating this property file manually will not help, because these files are always re-generated with default values.
Pega provides an option to use DSS to override the default server, broker properties.
We will try our hand at the end of the tutorial
How do we make sure stream service is started?
Step 1: Check the stream services landing page.
Switch to Designer Studio -> Configure -> Decisioning -> Infrastructure -> Services -> Stream.
You will see the stream service status as “NORMAL” for the only node on the local IP address – 192.168.2.5
Now it is clear that stream service is up and running, but still, how do we make sure Kafka and Zookeeper are running?
Again, Kafka listens on port 9092 and Zookeeper on 2181.
Check the port status.
Step 1: Open the cmd window using run as administrator.
Step 2: Execute the command = netstat –ab
This command will list all the listening ports in your machine.
You see Zookeepr port 2181 is occupied.
Similarly Kafka port 9092 is also occupied.
Step 3: Stop the stream service manually using the landing page execute option.
You see the Stream service is stopped.
Step 4: Now check the listening ports again using the command prompt – netstat –ab Search fully on the results, and you will find the ports are not listening 🙂
So it is pretty clear that when a stream service is started – Kafka and Zookeeper are started automatically in their default ports.
So how do we change the default settings for the Kafka server and Zookeeper settings?
As usual how we override the configuration settings, using DSS.
For the tutorial, we will try to change the Zookeeper port from 2181 to 2182.
Step 1: Create a new DSS
Owning Ruleset – Pega-Engine
Setting purpose – prconfig/dsm/services/stream/pyKeeperPort/default
Step 2: Set the value as 2182
Step 3: Restart the server for the DSS configuration to take effect.
On the cmd prompt check the listening ports again – netstat –ab
You see now port 2182 is occupied.
Also the stream service should be up and running again!
For other configuration settings, please visit the below pega link
Coming back to the question, how we do start the stream service?
2. In prior Pega 8 versions, you get a button to Add nodes manually in the stream services landing page
Okay, now what are the infrastructure requirements for running stream service or Kafka?
I recommend you to go through the knowledge article on Streams overview. You will get some basic understanding of disk space, compression settings etc for the stream services.
Let’s see two main differences between the normal Kafka implementation (we saw in the Kafka 2 post) and the Internal Kafka implementation with Pega package.
1. Zookeeper Library
Pega uses a project called ‘Charlatan server’. It is Pega’s own library to substitute the real Zookeeper with kafka implementation.
You will the source files for the charlatan in the Pega git repository
https://github.com/pegasystems/Charlatan
We know that Zookeeper main responsibility is to maintain the metadata about Kafka brokers, topics etc.
So how does this charlatan server maintain the metadata?
It is a usual answer 😉 yes, there is a dedicated table to store the metadata for Zookeeping.
Table name – pr_data_stream_nodes
View the rows in the corresponding table.
You see there is one Kafka cluster and one Kafka broker. You also see the metadata for all the Kafka topics.
So the understanding is Charlatan server uses this table to maintain the metadata.
The next main difference is
2. Consumer offsets are not stored with the Kafka server.
What is consumer offset?
Copy pasting the content from the Kafka introduction post.
Say a consumer reads the message from a partition 0 till offset 5. Now the consumer is taken down for maintenance purposes. When the consumer comes back live, from where it should read the message. The ideal situation will be, consumers should read after offset 5.
– Kafka internally stores the offsets at which the consumer group is reading.
– The offsets are committed in a Kafka topic _consumer_offsets.
– Whenever a consumer in a group processes the data, then it should commit the offsets.
Consumers can choose when to commit the offsets.
But with Pega Kafka implementation, there is no _consumer_offsets topic used!!
In the directory – C:PRPCPersonalEditiontomcatkafka-data
You will see no Kafka topic on _consumer_offsets.
So the question is how the consumer offsets are managed.
Again the same usual answer, In a dedicated table!
Table name – pr_data_decision_df_part
You see for every OOTB queue processor rules (consumer of kafka data) maintain their offset in the pxrecordsProcessed column.
When a Kafka message is processed, Pega maintains the offset in this table!
So the entire Architecture between Pega – Kafka will look like this.
There is one more useful topic on troubleshooting the stream services
https://community.pega.com/knowledgebase/articles/decision-management-overview/troubleshooting-kafka
As a summary
– Stream service helps in publishing and subscribing streams of data, storing stream of data, and processing the same data in real-time.
– Starting a stream service in turn starts the Kafka server and Zookeeper service, and the broker joins the Kafka cluster. designer studio -> Configure -> Decisioning -> Infrastructure -> Services -> Stream landing page shows the available stream services.
– Charlatan server – the library that substitutes the Zookeeper functionality with Kafka implementation.
– Table pr_data_stream_nodes stores the metadata for Kafka topic, broker, cluster etc. Table pr_data_decision_df_part stores the consumer offsets