Queue processor – Usage and Configurations
In this post we will see the architecture behind queue processor rule and all its configurations and usages.
This tutorial is implemented using Pega personal edition 8.4, but the core concepts remain the same in higher versions as well
What is a queue processor rule?
– Used for background processing that makes use of queue management and performs the processing.
– Queue processor rule uses Kafka message queuing and so it needs at least one running stream service for processing.
– A dedicated data flow run (Kafka consumer) will be used to perform the message processing (running the queue processor activity)
– Queue processors replace standard agents.
Standard agents Vs Queue processors
Queue processors (substitute for standard agents) and Job schedulers (substitute for advanced agents) were introduced from Pega 8.1
In this blog article, we will talk only about queue processors. Job scheduler, we can see in a separate article.
1. Database transactions
Standard agent processing primarily uses database transactions for the agent queue processing.
All the queue entries to the standard agents are persisted in their own corresponding database tables.
For example: we know that System-Queue-ServiceLevel is the concrete class that holds the SLA queue instances. ProcessEvents is the standard agent responsible for SLA processing.
This is one simple example for standard agent. A dedicated class instance, a dedicated table to store the queue instance (scheduled, broken items).
We saw in the introduction article, that Queue processors use Kafka stream service for processing.
In Queue processor, dedicated Kafka topics are used to persist the messages.
Let’s check the OOTB queue processor instances.
Records -> SysAdmin -> Queue Processor
Important note: OOTB queue processor instance list varies with Pega versions.
You will see the partition list for the Kafka Topics (Kafka Topic = Queue Processor name) under the Kafka data folder.
For the queue processor – pyFTSIncrementalIndexer you see the partitions for the topic PYFTSINCREMENTALINDEXER ( 1 to 20).
Important note: The count 1-20 refers to Kafka partitions in a Topic. Pega updated the default partition count from 20 to 6 from 8.6 Pega version. But you also get an utility to increase or decrease the partition count if needed.
Now a question, are the queued messages never get persisted in the database?!!
Go to the App explorer and explore the class hierarchy for System-Message-QueueProcessor.
You see there are two main classes – BrokenItem and Delayed Item classes those are mapped to pr_sys_msg_qp_brokenitems and pr_sys_delayed_queue respectively.
So broken queue processor messages and delayed messages are persisted in those tables.
Broken item
Whenever the processing fails, the messages are persisted in the broken items.
Delayed item
For delayed item, there are two scenarios for the incoming messages.
a) When you specify the queue processing of type dedicated, you can process the message in a delayed time interval!
b) When the stream service is not available.
For now there are no instances in DelayedItem class.
In the previous article, we saw how the stream service works in Pega. Let’s make the stream service not available.
Switch to Configure -> Decisioning -> Infrastructure -> Services -> Stream landing page.
Now use the execute button to stop the stream service.
Okay, now let’s queue some message to FTSIncrementalIndexer for search queue processing.
Switch to Configure -> System -> Settings -> Search landing page.
Start the re-index
As soon as you start, you will see an instance in the DelayedItem class instance.
Note: You can click and open the message and view the XML data. pzTimeForProcessing will give you the deadline time to process the message. The property value is similar to pyMinimumDateTimeforprocessing used in agent queues.
Just start the stream service back and see the delayeditem disappears. At the backend, message from the delayed item table gets pushed to the Kafka Topic.
You will see the message instance disappears.
2. Stream node dependency
Queue processors can ONLY work together with Stream services.
3. Performance improvement
Pega claims that Queue Processors support high scaling throughput than agent rules.
Let’s talk about parallel processing.
In Agent rule – you can tag the agent rule with the associated node type, say BackgroundProcessing.
If you have one server (node) with node type BackgroundProcessing (BP), then you agent rule can run one at a time. Similarly in Production, if you have 3 BP nodes, then your agent rule can run thrice at a time in different servers. This is the maximum parallelism you can achieve with agent rule, kind of vertical scaling.
With Queue Processors, multi-threading is possible.
Multi-threading in queue processor is totally related to Partitions in Kafka cluster.
We know that in a Kafka topic, you can have one or more partitions.
What is a partition in a Kafka topic?
A Kafka topic can be split into multiple partitions so that each partition can store the data individually
Partitions support parallel processing by splitting the data across partitions.
Say a new Kafka topic is created with 20 partitions. Then the messages pushed to the Kafka can be distributed across 20 partitions either in round-robin or to particular partitions using the message key.
Important note: Pega updated the default partition count from 20 to 6 from 8.6 Pega version. But you also get an utility to increase or decrease the partition count if needed.
So, how does the queue processor, Kafka topic, partitions and threading works???
For every queue processor created, Pega internal engine sets up (create) a Topic in the Kafka cluster using the queue processor name.
You see all the topics are split into 20 partitions by default.
You can always change the partition count using a DSS instance.
Owning ruleset – Pega-Engine
pyPurpose – prconfig/dsm/services/stream/pyTopicPartitionsCount/default
pySetting – 20
Check the link for more details
Important note: Please take a careful instruction when you want to change the default partitions count. As per Kafka documentation, changing any existing topic partitions CAN have an impact with re-distribution of data.
Now you know what is the maximum number of threads you can use to process the messages in the Kafka topic? – Yes. The answer is 20.
You can have 20 parallel threads processing the messages in the Kafka topic at the same time without any conflict.
We will see about the configuration points for multi-threading little bit later in the tutorial.
4. Run context
Queue processor run in the AsynProcessor Requestor type context ( < Pega 8.4 version) or system runtime context (Pega 8.4 version).
Open any existing Queue processor rule and click the System runtime context.
You will see the run-time context for the queue processor.
Make sure right applications are available to identify the queue processor rules.
For older Pega versions, you may need to update the AsynProcessor requestor type to include right access group that can provide right application access for queue processing.
Since I am using 8.4, requestor type is not applicable for me!!
More details about System runtime context in a separate blog article –
Before jumping into creating a new queue processor rule, we will use an OOTB queue processor to explore its architecture and run.
We will explore the queue processor – pyProcessNotification
Since it is already created OOTB, Topics and partitions should also be there already!
I am checking in my Kafka data folder – C:PRPCPersonalEditiontomcatkafka-data
Data flow run – Each queue processor rule will always be associated with a data flow run work instance of class Pega-DM-DDF-Work
You can use the App explorer to view its instances.
Click and open the pyProcessNotification instance.
You see the data flow has been running since the time personal edition Pega 8.4 is installed. Total processed records = 1;
As a pre-requisite to test this flow, I have created a new notification rule and configured in the Sales flow by following by own blog article on notifications.
https://myknowtech.com/tag/notification
You will see in the OOTB activity – pxNotify, Queue-for-processing method is used to queue the notification instance to the right queue processor rule.
Okay, All set. Now I am going to create a case to queue an instance to the queue processor rule.
1. Check the Topic partition for pyProcessNotification. You should see one of the partitions should be updated today.
It is clear that our message went in to Partition 1.
2. Check the data flow run. You see the processed records count is 2.
3. You see the offsets for partition 1 are updated rightly in the database table – pr_data_decision_df_part
4. As per the Queue processor execute activity – pzProcessNotification, it creates an instance of class Pega-Notification and process the notification.
5. Finally notification is processed and an email was sent to the right email address.
We saw the entire flow of how the queue processor work 😃
Now it’s time to create our Queue processor rule 🙂
Business scenario: For the tutorial purpose, I am going to keep it simple. Exact use case of notification, we need to send an email to the customer once the Sales case is created. Only small update is email should be sent out after an hour!! (delayed)
Note: This scenario, can be achieved in multiple ways.
We decided to use Queue processors to achieve the use case
What are the configuration points in a queue processor rule?
Create a new queue processor rule.
System -> Records -> SysAdmin -> Queue processor -> Create New
There is a single main tab that holds all the necessary configuration details.
Definition tab
Enable Queue processor – This is a simple toggle button, using this you can either enable or disable the queue processor rule.
Processing node settings –
In this block, you can specify the processing node settings.
Associated with node type – you will see the list of available node types.
I recommend you to go through my previous post on node classification –
Important note: Do not confuse the stream service processing with the node type.
Ideally, you can set the Background processing node as queue processor node.
When to process – You see there are two options.
- Immediate
- Delayed
We know that Kafka messages will get processed by consumers as soon as the message gets published to a subscribed topic.
Immediate – The messages get published to the topic at the same moment without any delay and so published messages can be processed immediately.
Delayed – here you can publish the Kafka message at later point of time. So what happens with the message?
What happens when stream service is not available?
The answer is same for both the questions :). Pega creates an instance in System-Message-QueueProcessor-DelayedItem class with a deadline time.
So once the time is reached, then the message gets published to the Kafka topic! Simple right.
In our use case, we will explore the delayed processing.
Number of threads per node –
This is a simple mathematical equation. We know that by default 20 partitions are created for any kafka topic and hence 20 threads are applicable to process those partitions simultaneously.
It also says, number of threads per node.
In my personal edition, there is only one node in a cluster, so I can give the maximum possible thread count as 20.
If you have 2 nodes, then you can give 10.
Note: If you give thread count as 25, the system can create the threads but of no value and will not be used.
Important note: Use this option effectively, because threads also do eat up our system resources.
Processing retries
This block defines the error handling part for our queue processor rule
Max attempts – you can define the maximum retry attempts for the queue processor to process the message.
Once the max retry attempts is reached and still the processing stays in error, then an broken item instance will be created under class – System-Message-QueueProcessor-BrokenItem.
The broken item class is mapped to table – pr_sys_msg_qp_brokenitems
Next question, how frequent the retry attempt will be made?!
Here you can specify the number in minutes and also delay factor.
Initial delay – It specifies how may minutes the processing should wait before trying the first retry attempt.
Delay factor –
For the subsequent retry attempts, you can also specify a delay factor.
For example, say the message is scheduled to process by 04:00 PM.
Max attempt = 3; Initial delay = 1 minute and delay factor = 2.0;
First retry attempt will be original process time + initial delay = 04:01 PM.
Here delay = 1;
Second retry attempt will be = First retry attempt time + (delay * delay factor) = 04:01 + ( 1 * 2) = 04:03 PM
Now delay = 2;
Third retry attempt will be = Second retry attempt time + (delay * delay factor) = 04:03 + ( 2 * 2) = 04:07 PM.
This can go one till the max attempt limit!!
Processing activity
The name of the block clearly explains, where you can specify the processing activity name.
As per my simple use case, I am going to directly call the SendSimpleEmail activity by passing the right parameter for email.
We are done with the configuration. Now the main button ;). Click save.
You will see the run context message in the top of the rule form.
As soon as you create a new queue processor, two main actions will be executed.
1. Verify the new Topic and partitions created for this new queue processor in the Kafka data folder.
2. Verify the data flow run for the queue processor (Kafka consumer)
Click and open the new data flow work item on the same queue processor name.
You will see the data flow has been started and it is up and running.
Okay now,
– kafka Topic is created to store the Kafka messages.
– A Kafka consumer is ready running that can process the messages and execute the queue processor activity to send out the email.
One missing thing is How to produce the Kafka messages to a topic?
There are two ways to do it.
- Using activity method – Queue-For-Processing
- Using an advance flow shape – Run in parallel.
1. Queue-For-Processing
Type of queue – Standard and Dedicated.
What are the differences between Standard and Dedicated queue processor?
There is an OOTB queue processor – pzStandardProcessor, which is referred as Standard type. All other queue processors are of dedicated to their own functionalities.
You can look at the pzStandardProcessor rule for more configuration details.
I will give you a scenario. You need to write a one- time utility to update 1 Million cases with a new Indicator – say Covid19 Indicator based on certain conditions. Your one time activity can browse the relevant cases and can update all cases with the indicator. Updating million cases will always come with a performance impact.
How to make use of Kafka power to process it really quick? – Obviously you need queue processor to make use of Kafka processing. BUT, do we need to create a separate queue processor for this one time activity??? The answer is Big NO.
You can make use of the already existing standard queue processor. It indirectly means, we will make use of existing Topic, partitions and running data flow work item. All you need to do is just to provide an executable activity (the activity that can update the Covid19 indicator) for the standard processor to run.
Standard processors can perform immediate execution, whereas dedicated processors can either do immediate or delayed processing.
In our scenario, it is for a dedicated queue processing use case and we already created a new queue processor instance.
When you use dedicated queue processor that is configured to run as delayed, then you get an option to specify a data time property for delayed time or a simple expression.
Lock using –
You have three options
- Primary page – If your primary page has pzInskey, then you can use this option, so that pega take care of locking automatically on the step page.
- Key defined on property – If your step page contains key other than the pzInsKey, then you can specify property that can used as key.
- None – if you don’t want to use locking, then use this option.
Advanced parameters –
Write now – Use this option when you want to queue the message immediately even when the commit transaction fails.
Use this option only when necessary. So that only when the complete transaction succeeds then only your message will be queued.
Time to acquire lock – 30.
Say like your case is locked by an user.
– When value greater than 0 – then it keeps on trying till the lock timeout is reached (30 minutes) . Only then the retry counter will be increased.
– When value is lesser than 0 – then it keeps on trying indefinitely without increasing retry counter.
– When value is equal to 0 – lock fail will be treated as retry attempt and retry counter will be increased.
Queue current snapshot of page – If you want to use the snapshot of the page (the current property values) at the time of queuing, then you can use this option. If not checked, then system will open the instance using pzInsKey to get the updated or refreshed value.
Alternate access group –
- If empty – processing happens with the operator who queues the message
- If not empty – the specified access group will be used to get the context.
2. Run in background flow shape –
Step 1: Add the advanced shape – Run in background in the first stage.
Step 2: configure the run in background step.
Dedicated type – SendEmailNotification.
You see the configuration points are similar to the queue-for-processing method parameters.
Map item ID – you can map this value to a property so that at later point of time, we can update or delete the message before getting published!
Processing Date – For tutorial purpose, I am going to delay publishing the message only for 5 minutes using the expression – @addToDate(@CurrentDateTime(),””,””,”5″,””)
Set to ProcessDate property –
We are done with the configurations :). It is time to test.
Test the Data flow processing
Step 1: Create a new sales case. S-6007.
Step 2: Check the MessageID for the case in the clipboard.
Step 3: You will see the same instance in the queue processor delayed item.
Note: Using the message ID as key, you can delete or update before getting published to Kafka.
Step 4: Wait for 5 minutes, the delayed queued messages will disappear and get published to the Kafka topic.
Step 5: The new topic SendEmailNotification partition gets updated with the message data.
Step 6: You will also see the data flow run processing went well.
Step 7: Finally, you will also see the email sent to pegaknowledgesharing@gmail.com
Okay, now let’s see some debugging steps involved in queue processor rules.
How to debug queue processing?
In the admin studio, you can trace the queue processor run similar to the agent run.
Also you can remove or re-queue the broken queue item similar to agents.
Step 1: Open the admin studio
Step 2: click on the queue processor in the left panel and see the list of queue processors.
You can stop the queue processor, Trace or view data flow corresponding to the queue processor.
Step 3: you can also process the broken queue items.
Click and open the link. There you can select and either remove or re-queue the instances.
Going forward, Pega claims that they will deprecate agents and we will only be using queue processor rules 🙂
So how do we move all the existing standard agents rules into queue processor rules?
– We know that standard queue processors can process messages immediately ( real time) where as dedicated queue processors can process in a delayed time interval.
– Standard agents use a database for storing messages and uses an activity rule for processing. Similarly, queue processors using Kafka for storing message and also uses an activity rule for processing.
So one thing is simple. You can reuse the existing activity from standard agents to queue processors.
1. If your standard agent process real time messages.
All you need to do is
Step 1: Find the activity from where you queue to the standard agent – queue-for-agent method.
Step 2: Replace the method with queue-for-processing.
Use standard queue processor type, so that you don’t want to create a new queue processor rule. (Since the processing is immediate)
Step 3: Specify the right processing agent activity.
Your standard queue processor will take care of executing the agent activity.
For real time processing, the expectation is there are no queue items waiting to be processed when you want to migrate from standard agent to queue processor.
Step 4: Finally, you can disable the standard agent in the agent rule form.
2. If your standard agent process delayed messages.
Step 1: Create a new dedicated queue processor rule and reuse the same agent activity inside.
Step 2: Find the activity from where you queue to the standard agent – queue-for-agent method.
Step 3: Replace the method with queue-for-processing and specify the dedicated queue processor rule.
In delaying processing scenario, definitely there will be some scheduled instances which will be waiting to get processed in the future date.
You can allow those instances to get processed by the standard agents. Once all the queue instances are processed, then you can put the standard agent to bed once and forever !!!
Now a big summary,
– Queue processors can work only when stream service is available
– Queue processors support high throughput scaling than standard agent processing.
– System-Message-QueueProcessor-DelayedItem and System-Message-QueueProcessor-BrokenItem concrete class instances support queue processing to store the delayed messages as well as the broken messages.
– A dedicated data flow run will always be associated with the queue processor.
– Creating a new queue processor rule will automatically create the Topic and partitions in the server kafka data folder and also creates a new data flow run.
– Messages to the Kafka topic can be produced by using Queue-For-Processing method or using Run in background smart shape.
– Real-time processing can be achieved using a standard type queue processor, while delayed processing can be achieved using a dedicated type queue processor.
– You can easily replace any standard agent to queue processor rules.
Hope you enjoyed this blog article!!