Parallel Processing with Constraints

An interesting senario, I recently encountered - have tried to tear down the context to get to the solution early, and avoid nitty gritty (boring) details. Also, example showns only 2 threads.
Context:
Different Actions (like: 1,2,3) from Cutomers (like Mr.A, Miss.B..) are pushed into a stream/queue from an external component. “$” denotes end/last action from a customer. A thread can 1) Process each action 2) Generate aggregated views for each customer per session - and then write it to the sink. The processing threads are started when the session starts and a user cannot reenter the session once done.
note: all threads mentioned are background and long running.
Constraints:
Aggregation can be triggered only after all the actions are processed. i.e processing of action A1, A2, A3 should be done - before aggregation “A$” of A is triggered.
1. Single Thread
Well, this is a no brainer. We have one thread which reads (pull) from the stream/queue and process one item from the queue at a time. All constraints can be managed with a global context inside the thread.
Below is a code which runs inside the Thread.
# {name, [actions]}
customer = {}
def read_from_stream(name, action):
if action=='$':
aggregate(name)
else:
process(name, action)
def process(name, action)
result = process_the_custom_action(action)
cusomter[name].append(result)
def aggregate(name):
action_results = cutomer[name]
view = gen_agg_view(action_results)
write_to_sink(view)
# remove customer from tracking
del customer[name]
def process_the_custom_action(action):
#Heavy lifting operation
return result
2. Pull Model - Parallel Processing
Later we realized, the function processing_the_custom_action is heavy lifting and takes more CPU. So, we were not able to keep up, with the incoming data rate. So, the memory increased drastically and triggers garbage collection and slow down actual code execution.
So, we decided to introduce multiple threads to read from the queue. Same code as above - executed in multiple threads. Each thread would pop (PULL) one item from the queue, process it and then pop the next item. Just to be explicit: one item from the queue is processed by only one of the threads, but which thread (order) gets which item is not guranteed.
This design had a major challenge in ensuring the constraints are honored.
In the above example: Aggregation for customer “A$” should be triggered only after last action “A3” is done. But “A$” and “A3” are processed in two different threads i.e. A3 by Thread #1; A$ by Thread #2. How would Thread #2 know A3 processing is done?? 😞
If we really need to go with this approach and also honor the constraints - then there should be broadcast events fired and all threads subscribed to it. Each thread should subscribe and do individual book keeping for all customers being processed across all the threads (dependency across threads, coupling..huff!!) 😰
Pros: If any of the thread is killed in between. The data loss would be minimal i.e only the customers already read by that specific thread in the past and not completed. As all NEW data would be processed by one of the threads alive.
3. Push Model - Parallel Processing
To ensure, we honor the constraint and also enable parallel processing. We introduced a load_balancer, called it a distributor module.
Distributor Module:
- Would have reference to all the available processing threads.
- Each thread would have a corresponding queue. So, No_Of_Threads equals No_Of_Queueus.
- Each queue would have an index, which range from 0 to No_of_threads.
0 < Queue_Index < No_Of_Threads
- Once an item is read - it generates a hash using the “name” of the customer. (like: SHA256)
- On the HASH we apply a MOD using the No_Of_Threads available and get the queue index
Queue_Index = hash(customer_name) % No_Of_Threads
- Push the item to the corresponding queue_index
With this approach, we ensured - same customer items are always processed by the same thread. So, there are no cross talks across threads or external dependency. We could increase or decrease the number of threads used (obviously: before a session starts not in-between), based on a configuration or heuristics.
#### Distributor Module
def read_from_stream(name, action):
# Generate hashkey using customer name.
# Mod using the number of threads. No_Of_Thread == No_Of_Queues
queue_index = hash(name) % No_Of_Threads
publish(queue_index, name, action)
def publish(queue_index, name, action):
# get the queue reference using the index
queue = queues[queue_index]
# add the item into corresponding queue
queue.push(name, action)
Cons: The biggest cons: If a thread is killed for what ever reason. All the customers past and future once - which would be queue/allocated to the thread would be dropped. But in the pull model, it would have dropped only a handful sitting inside (past) the thread. Well we could handle this dynamically - like have a heart beat mechanism and update the “No_Of_Thread” and get it working - just that it would be another infra.
Conclusion:
There are pros and cons to both approaches.
More Infra == More Code == More Maintainability == More to Debug == More room for bugs to creep in.
Trying to keep it simple while keeping the tradeoffs minimal is the key. In our case; we went with the push approach - as the chances of a thread crash was minimal and also we added some defensive coding and drop an item safely if required.