As a data engineering enthusiast, you must be aware that Apache NiFi is designed to automate the data flow between multiple software systems. NiFi makes it possible to understand quickly the various dataflow operations that would otherwise take a significant amount of time.
In this blog, we deal with a specific problem encountered while dealing with NiFi. It has a feature to control the number of concurrent threads at an individual processor level. But there is no direct approach to control the number of concurrent threads at a process group level. We provide you with an approach to help resolve this challenge.
A look at the existing concurrency feature in NiFi
As mentioned, NiFi provides concurrency at an individual processor level. It is available for most processors and this option is available in the Scheduling tab called “Concurrent Tasks”. But, at the same time, there are also certain types of single-threaded processors that do not allow concurrency.
This option allows the processor to run concurrent threads by using system resources at a higher priority when compared to other processors. In addition to this processor level concurrency setting, NiFi has global maximum timer and event-driven thread settings. Its default values are 10 and 5 respectively. It controls the maximum number of threads NiFi can request from the server for fulfilling concurrent task requests from NiFi processor components. These global values can be adjusted in controller settings (Located via the hamburger menu in the upper right corner of the NiFi UI.)
NiFi sets the Max Timer Thread Counts relatively low to support operating on commodity hardware. This default setting can limit performance when there is a very large and high volume data flow that must perform a lot of concurrent processing. The general guidance for setting this value is two to four times the number of cores available to the hardware on which the NiFi service is running.
NOTE: Thread Count applied within the NiFi UI are applied to every node in a NiFi cluster. The cluster UI can be used to see how the total active threads are being used per node.
Custom approach to set the concurrency at processor group level
To customize and control the number of concurrent threads to be executed within a processor group, use the NiFi Wait and Notify processors. Using notify signals created by the Notify processor, the number of threads to be executed within a processor group can be controlled.
Here is a sample use case – create a NiFi flow that processes input flow files in batches. For example, process five inputs at a time within a processor group and always keep those five concurrent threads active until any of them get completed. As soon as one or more flow files complete their processing, those available slots should be used for the queued input flow files. To explain further, if there are 40 tables to be processed and only five threads can be in parallel within the actual processor group, it would mean initially 5 tables have to run concurrently by taking 5 threads from the system resources and the remaining 35 tables will get a chance only when any of the previously running threads gets completed.
Now, to set the concurrency, design two processor groups PG-1 and PG-2,
Thread Controlling Processor Group(PG-1): This is the core controller that manages the number of concurrent threads. It decides how many concurrent threads can run within the processor group PG-2.
Actual Processor Group(PG-2): This is the processor group that performs the functionality that we want to parallelize. For example, it can be a cleansing/transformation operation that runs on all input tables.
Code base (XML file) for this NiFi template is available in GitHub — https://github.com/karthikeyan9475/Control-Number-of-Threads-In-Processor-Group-Level-in-NiFi
How does this work?
As mentioned, this functionality is achieved using Wait and Notify NiFi processors. PG-1 controls the number of flow files that get into PG-2 using Gates (Wait processor). This is nothing but signals created by Notify. In this NiFi template, there are three Gates and you will see how they work below.
PG-1 triggers 3 flow files via the input port, and each of them performs certain actions.
1st flow file: It triggers Notify processor to open the Gate-1 (Wait processor) and allows 5 input flow files (configurable) and triggers the Notify processor to close the Gate-1.
2nd flow file: It triggers 5 input flow files to Gate-1 which was opened by the previous step and reaches PG-2.
3rd flow file: It triggers any remaining input flow files to Gate-2 (Wait processor) and it acts as a queue. This Gate-2 will get notifications if any of the previously triggered flow files complete their processing in PG-2. This Gate-2 releases remaining input files one at a time for each Notify signal it receives.
Initially, 5 flow files will be released from Gate-1, leaving 35 flow files queued in Gate-2. When one or more flow files get completed, it will send the notification signal to Gate-2. Each notify signal releases one flow file from Gate-2. This way, it ensures only 5 concurrent threads are running for PG-2.
Screenshot of NiFi flow that shows controls for concurrency:
Screenshot that shows concurrent threads on top of PG-2:
Handling signals created by last few inputs
Once the last few input flow files (36, 37, 38, 39 and 40 in the above example) get processed by PG-2, it triggers signals to Gate-2 (which is a queue) that there are no further input files to be processed. These additional signals can lead to additional runs within PG-2 when a new set of inputs arrive. This is avoided using a special Neutralization gate that by-passes all these additional flow files from getting into Gate-2.
Enhancing this solution to multiple applications
The above example was to just for one requirement i.e., processing various input tables that are received concurrently. What if at an enterprise level, this cleansing process has started being recognized and all source tables from the various applications are asked to be processed in this processor group PG-2.
Let us say, App 1 is for processing sales tables and App 2 is for processing finance tables. How do we achieve 5 concurrent threads maintained within PG-2 for each application?
All one would need is a small modification in the Wait/Notify release signal identifier. This would involve making the name of signals as attribute-driven instead of hardcoded signal names. It supports NiFi expression language. Hence by making the release signal identifier as attribute driven, one can control threads within a processor group at an application-level granularity.
By default, the expiration duration will be 10 minutes, which means queued flow files will wait in the queue for the notify signal until 10 mins, then these flow files will get expired. The expiration duration needs to be configured in order to avoid the flow file expiration.
Debug vs Actual Execution mode:
During development, there will be situations to simulate signal files using generate flow file processors and that can lead to orphan signals waiting for processing when there are no input flow files. In the subsequent run, input flow files will get processed using orphan signals that were already there during the debugging stage.
In the above scenario, if any of the flow files go into the success queue, there is some unused signal from the Notify processor which is mistakenly stored in Wait processor while developing or testing. In the above image, it is clear that 18 notify signal are triggered by mistake. To make sure that Wait processor is working as expected, all the simulated flow files should go to the Wait queue, not to the success queue. If the debugging step is skipped, it may lead to run a higher number of parallel threads inside the actual processor group than expected.
By using Wait/Notify processor, the number of concurrent threads can be controlled inside the Processor Group in NiFi. This will help one to build a complex data flow pipeline with controlled concurrent threads. As you will certainly know by now, Apache NiFi is a powerful tool for end-to-end data management. It can be used easily and quickly to build advanced applications with flexible architectures and advanced features.