In the Insights on .NET Concurrency series we covered three main forms of concurrency in that can be used to solve different problems. Each form has its own strengths to solve problems of this complex world. The right tool for the right job. Parallel programming using TPL offers easy way of parallelising operations in order to increase application’s throughput. Asynchronous programming is becoming a functional requirement for modern UI applications. We saw that async/await abstract away a lot of the complexity. Reactive programming using Rx is fit for scenarios when your applications has data sources (streams of data).
Join Forces
A nice way to look at the concurrency forms is that they complement each other. Thanks to the extensiveness of Rx library we can convert a Task into an Observable and utilise Rx capabilities with parallel and asynchronous programming. The same way we joined Parallel and Asynchronous in a previous post, but here we adding Rx.
Below is a method that executes in parallel 3 operations and awaits them asynchronously.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public async void ParallelTasksAsync() { var task1 = Task.Run(() => CpuBoundOperation(1)); var task2 = Task.Run(() => IoBoundOperation(2)); var task3 = Task.Run(() => IoBoundOperation(3)); await Task.WhenAll(task1, task2, task3) .ContinueWith(a => { foreach (var @decimal in a.Result) Debug.WriteLine(@decimal); }); } private decimal CpuBoundOperation(int seconds) { Thread.Sleep(seconds * 1000); return (decimal)seconds; } private decimal IoBoundOperation(int seconds) { Thread.Sleep(seconds * 1000); return (decimal)seconds; } //Output //1 //2 //3 |
When adding Rx to the mix, that’s what you can achieve:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
const decimal MinimumThreshold = 2.0m; //Scenario 1 // - Run multiple task in parallel; Task Parallelism // - Only accept results that meet certain criteria // - Timeout if no correct result received // - Post process results e.g. Sum public async void Scenario1() { var task1 = Task.Run(() => CpuBoundOperation(1)).ToObservable(); var task2 = Task.Run(() => IoBoundOperation(2)).ToObservable(); var task3 = Task.Run(() => IoBoundOperation(3)).ToObservable(); var tasks = new List<IObservable<decimal>> {task1, task2, task3}; var sum = await tasks.Merge() //make them one observable sequence .Where(a => a >= MinimumThreshold) //value pushed must meet criteria .Timeout(TimeSpan.FromSeconds(5)) //if no data pushed within 5 seconds it will throw TimeOut exception .Sum(); //post process Debug.WriteLine($"sum: {sum}"); } //Output //sum: 5 public async void Scenario2() { var task1 = Task.Run(() => CpuBoundOperation(1)).ToObservable(); var task2 = Task.Run(() => IoBoundOperation(2)).ToObservable(); var task3 = Task.Run(() => IoBoundOperation(3)).ToObservable(); var tasks = new List<IObservable<decimal>> { task1, task2, task3 }; decimal sum = 0.0m; //shared resouce (closure) await tasks.Merge() //make them one observable sequence .Where(a => a >= MinimumThreshold) //value pushed must meet criteria .Timeout(TimeSpan.FromSeconds(5)) //throw timeout exception if no data pushed within 5 .ForEachAsync(a => sum += a); //post process each item Debug.WriteLine($"sum: {sum}"); } //Output //sum: 5.0 public async void Scenario3() { var task1 = Task.Run(() => CpuBoundOperation(1)); var task2 = Task.Run(() => IoBoundOperation(2)); var task3 = Task.Run(() => IoBoundOperation(3)); var tasks = new List<Task<decimal>> { task1, task2, task3 }; var sum = await Task.WhenAll(tasks) //Task<decimal[]> .ToObservable() //IObservable<decimal[]> .SelectMany(a => a) //Flattens array .Where(a => a >= MinimumThreshold) .Timeout(TimeSpan.FromSeconds(5)) .Sum(); Debug.WriteLine($"sum: {sum}"); } //Output //sum: 5 public async void Scenario4() { var task1 = Task.Run(() => CpuBoundOperation(1)); var task2 = Task.Run(() => IoBoundOperation(2)); var task3 = Task.Run(() => IoBoundOperation(3)); var tasks = new List<Task<decimal>> { task1, task2, task3 }; try { await Task.WhenAll(tasks) .ToObservable() .Timeout(TimeSpan.FromSeconds(2)) .ForEachAsync(a => Debug.WriteLine(a)); } catch (TimeoutException ex) { Debug.WriteLine($"Error: {ex.Message}"); } } //Output //Error: The operation has timed out. |