Intro

This is the last form of concurrency we are going to cover in Insights on .NET Concurrency series. Reactive programming is a programming paradigm that is built on the concept of asynchronous data streams. Each asynchronous event feeds into a stream of events, where the stream can be observed, composed, manipulated, filtered etc. Its key characteristic is being a push not a pull model i.e. data arrives to the stream and gets pushed to the subscribers instead of consumers pulling data from a data source; IObservable<T> vs IEnumerable<T>. Data streams could be anything from events e.g. mouse clicks, stylus movements, to data variables as in simulation results, stock exchange ticks, or news feed. 

Motivation

A web or desktop client needs to show the progress of a certain process. The process in its nature is not deterministic i.e. the steps of the process and their time period varies based on the incoming request context. The progress data coming from the process can be fed into an observable stream i.e. the source. Then the client can react to any changes on the stream, by subscribing a handler/observer to a progress observable stream instance.

Another real life scenario is when a user enters a text into a filter text box in order to start filtering a list of data. Normally we would like to wait few milliseconds for the next key stroke before firing the filtering algorithm. Since each key stroke is an event and the pressed key is the data that feeds the observable stream. Such requirement can leverage reactive programming (Observable.FromEventPattern).

Reactive programming helps developing real time applications and in fire and forget scenarios. Rx will not block the main thread waiting for the results of IO bound operations because the results get pushed to the observers. Another key behavioural difference when programming for observable sources is that individual data gets pushed to observers as soon as they arrive instead of calling method waiting for the caller method to fetch all data.

Lastly, working with .NET events, is in away non flexible and has its own limitation and quirkiness. For instance events are difficult to debug, cumbersome with multithreading as you need to do the marshalling, not queryable, and has much less capabilities from what Rx will offer.

Building Blocks

Unlike parallel and asynchronous programming, reactive programming library is not part of TPL it has its own library called Reactive Extensions for .NET (Rx.NET), and at the time of writing Rx.NET is not part of the shipped .NET system libraries and can be added as a Nuget package to your project.

The major building blocks in Rx library are the two interfaces IObservable<T> & IObserver<T>, which actually they belong to System namespace from mscorlib.dll.

Observable.Create

This method is probably the most popular method you gonna be using during reactive programming. It is key to get its mechanics right in your head at early stages. Rx provides few overloads for the Create method. Lets look at a common one:

How to read:

  1. Observable.Create is about creating an observable data source. It creates a source with the help of an internal observer.
  2. The Func uses the observer to push data to the observable (source)
  3. Then the consumer subscribes to the created observable (source) and receives any data pushed by the internal observer.
  4. Subscriber stops receiving by disposing the subscription.

The Rx library adds a bunch of useful extensions to deal with a sequence of asynchronous events, LINQ-style query capabilities for observable sequences, and Schedulers to support concurrency. A observable sequence can be seen as a collection of items, hence querying it using LINQ adds new capabilities to manipulate events like Min, Max, Average, Count, GroupBy, Select, Aggregate, etc. On the face of it IEnumerable<T> and IObservable<T> interfaces look similar, however they have fundamental differences as IEnumerable<T>/IEnumerator<T> is pull i.e. you query the data source, and IObservable<T>/IObserver<T> is a push the data source notifies you, hence called dual.

Observable Temperature

There are two types of observable sequence; cold and hot. A cold observable starts pushing data to observers only when subscription occurs. A hot observable starts pushing data regardless of the presence of any subscribed observers. When an observer subscribes to a hot observable sequence, it will get the latest value in the stream. Whilst when an observer subscribes to a cold observable the sequence will reset for each subscriber. Examples on cold observable are everything that uses Observable.Create, or asynchronous request. Examples on hot observable are Subject<T>, mouse clicks/position, key strokes, news feed, timer ticks, or network packets broadcast.

Next is a real life example of an IO bound operation using reactive approach.

The observable temperature can be changed using Publish()/RefCount() or Defer(). For instance, a cold observable can be converted to hot one using the Publish()/RefCount(). The example below shows that after converting a cold observable to hot and connecting it, any subscriber jumps on board will get the current value of the sequence as shown in subscription2.

Subject Class

The Rx library introduced few out of the box implementations for the major interfaces. Subject<T> which extends both IObserver<T> and IObservable<T> acts as both an observable sequence as well as an observer. In other words it can act as a proxy object between the publisher and the observer. A subject broadcasts to all subscribed observers in the thread-safe list of subscribers. The code below is similar to the one before but is uses the Rx Subject object.

There are few other implementations of the ISubject<T> interface such as ReplaySubject<T>, BehaviorSubject<T>, and AsyncSubject<T> worth exploring. For instance ReplaySubject<T> caches (or buffers) any received items. So any items received before subscribing to it can still be accessed unlike the Subject<T> where any item before subscription is lost. On the other hand, BehaviorSubject<T> buffers only the last item received. AsyncSubject<T> it caches only the last received item and it publishes ONLY on completion. It must remind you of the Task<T> where it notifies a single value (Task.Result) on completion.

On the face of it, it seems that the Subject class is the one to use from now on wherever reactive is appropriate since it offers both behaviours. The caveat is that Subject exposes more functionality and state than you normally need (has extra responsibilities). I am not saying never use it, it was introduced in the API for a reason. So generally speaking use Subject sparingly. Subjects are deemed the ‘Mutable Variables‘ in the reactive paradigm. The recommended scenario to use Subjects is when you need a hot observable and a proxy. Further insights on when to use Subject check To Use Subject Or Not To Use Subject?.

The example below shows two implementations of a DataProvider class. One using Observable.Create and the other using Subject. You notice that the approach with Subject blocks the thread. And that’s because it is hot observable and eagerly evaluated i.e. as soon as the method called (unlike Observable.Create which is lazy evaluation). Also in certain scenarios when using Subject class cancellation becomes harder. Note: SubscribeOn is covered in the next section.

Rx Concurrency

Rx by nature is not concurrent hence not multithreaded. Rx is leveraged to handle asynchronous data events in your application. The subscription and observer code are carried on the same single thread by default. The good news is that we can easily configure Rx scheduling approach and potentially subscribe, invoke observable, and observe notifications on different threads. To understand concurrency in Rx we need to understand how SubscribeOn, ObserveOn, and IScheduler work.

SubscribeOn specifies the scheduler at which the observable will be invoked. ObserveOn specifies the scheduler at which the notification will be observed.

The scheduler specifies when a certain action is going to take place. Different implementations of the IScheduler interface can specify when and where to perform the action. The available types of Rx schedulers are (Source MSDN):

  1. ImmediateScheduler: The specified action will start immediately.
  2. CurrentThreadScheduler: The specified action will be scheduled (queued) on the thread that made the original call.
  3. DispatcherScheduler: The specified action will be scheduled on the current Dispatcher e.g. WPF app dispatcher.
  4. NewThreadScheduler: The specified action will be scheduled on a newly created thread. Requesting the creation of a thread is expensive. Ideal for long operations e.g. responsive UI.
  5. ThreadPoolScheduler: The specified action will be scheduled on a thread pool thread. Ideal for short operations.
  6. TaskPoolScheduler: The specified action will be scheduled using TaskFactory from TPL. Ideal for short operations.
  7. VirtualScheduler: Useful for testing and debugging by emulating real time .

Note: Prefer TaskPoolScheduler over ThreadPoolScheduler whenever possible. The former might not be supported in your platform. TaskPoolScheduler fits with the modern .NET abstraction of threads, as it uses Tasks that ultimately get scheduled to a thread pool thread. Unless you have a long operation where the overall computation time makes thread creation overhead look relatively small. then use NewThreadScheduler. TaskPoolScheduler uses TaskPool which is optimised for multi core as covered in the first post of this series.

The coding and design guidelines covered in Asynchronous Programming still apply here when using Rx to achieve concurrency. For instance, keeping a UI responsive during a CPU bound operation by pushing the delegate to a thread pool thread, we must consider handling reentrancy and thread affinity.

The following code snippet depicts the workflow of subscription, invocation, and observation using Rx schedulers. The the subscription takes place on original/caller thread, invocation on a new thread, and the observation on the current context. In this case the context is the WPF dispatcher context and equals original context.

Parallel Execution

The Start and StartAsync invoke an action or functions asynchronously. The difference is that StartAsync fits a method signature that return Task<T>.

Asynchronous Execution

The ToAsync is used to invoke synchronous method asynchronously.

Next

As you have noticed the Rx library packs a lot of capabilities and provides a large variety of extension methods to help developers in many scenarios. In this post we tried to cover a subset of the Rx library that fits with the scope of this series. I hope you have established enough understanding of what reactive programming is, where to use it, Rx key types and some of its extensions methods.

The next interesting thing is to see how mixing the three forms of concurrency play together. Each form of concurrency has its strengths and when they join forces could give extra flexibility and new features to your code. In the next post shows how parallel asynchronous, and reactive programming work together.

Last but not least, I highly recommend the following Rx resources:

 

Leave a Reply

Your email address will not be published. Required fields are marked *