Today I am working on an IoT enterprise project. And as all of us know, there are thousands/ millions of messages arriving at the message broker and sitting there for being consumed by decoupled worker, computing and managing all the different types of messages.
The special thing, I am currently working on, is a kind of configuration worker, that is generated automatically by a “main worker”, that pre filters massages and so on – I will not discuss the architecture in that article (maybe in a later post). This post is not about IoT infrastructure,where goal is to make parallelism and scalability possible for device-to-cloud-communication. It is about the same achievement, but for events in a software component itself.
The Pipeline-Pattern is the key
Consider following common scenario: you’ re trying to handle multiple messages in a software unit arriving as events on a handler. The unit reacts with some heavy computational algorithms or long running ops on services, files,… . A traditional way would be, to hook up a handler, that calls some methods handling the content of the arriving message. Because of the nature of event handler, there are multiple handlings on multiple messages possible. So why changing the pattern?
I think common IoT scenarios are showing, why. They are not that different to events and handlers on your application. The problem is ‘blocked ressources’! If there are handler, that are running long, then it is not guaranteed to process other handlers in parallel. Another thing is also scalability of processing. But how can Piplines help out?
Consider now following solution: an event/message arrives and will be stored into a queue. At the other end of that queue, a receiver can consume the message, when there is time left for working on messages. A queue is by nature a system, that decouples. Also a sender can put into a queue as along that queue has enough space left. So, with that concept, an event handler is able to receive events and also has not to care about anything else, but its main objective – handling events.
Give me code!
Here is an example, of what I mean (it’s written in C# ).
Old way:
private void OnReceiveReplyConfigMessage(object sender, IQueuedMessage message){ if(message.IsUsefull){ var fileName = message.ParameterFile; var parameter = LoadParameter(fileName); // long running. Process(parameter); //long running } }
Way with Pipeline-Pattern:
public void Start(){ //.... var bufferLimit = 100; var messageBuffer = new BlockingCollection<IQuededMessage>(bufferLimit); var fileQ = new BlockingCollection<IQuededMessage>(bufferLimit); var cts = new CancellationTokenSource(); var acknowledgeHandler = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var loadFileTask = acknowledgeHandler.StartNew(() => LoadFile(messageBuffer, fileQ,token), token); var processTask = acknowledgeHandler.StartNew(() => Process(fileQ, token), token); //.... this.messageEvent += OnReceiveReplyConfigMessage; Task.WaitAll(loadFileTask, processTask); //clean up.... }
private void OnReceiveReplyConfigMessage(object sender, IQueuedMessage message) { messageBuffer.Add(message, cts.Token); } private void LoadFile(BlockingCollection<IQuededMessage> inputQ, BlockingCollection<IQuededMessage> outputQ, CancellationToken token) { foreach (var item in inputQ.GetConsumingEnumerable()) { //... load file //... read parameter from file outputQ.add(parameter); } } private void Process(BlockingCollection<IQuededMessage> inputQ, CancellationToken token) { foreach (var item in inputQ.GetConsumingEnumerable()) { //do processing stuff here... } }