Reactive Extensions

Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.

Data sequences can take many forms, such as a stream of data from a file or web service, web services requests, system notifications, or a series of events such as user input.

Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrive.  The Rx library is available for desktop application development in Ruby. It is also released for Python, .NET, Silverlight, Windows Phone 7 and JavaScript. For more information on these different platforms, see Differences Between Versions of Rx topic.

Pulling vs. Pushing Data

In interactive programming, the application actively polls a data source for more information by pulling data from a sequence that represents the source. The iterator allows us to get the current item by returning the current property, and determines whether there are more items to iterate (by calling some on_next method). 

The application is active in the data retrieval process, controlling the pace of the retrieval by calling on_next at its own convenience. This pattern is synchronous, which means that the application might be blocked while polling the data source. Such pulling pattern is similar to visiting your library and checking out a book. After you are done with the book, you pay another visit to check out another one.

On the other hand, in reactive programming, the application is offered more information by subscribing to a data stream (called observable sequence in Rx), and any update is handed to it from the source. The application is passive in the data retrieval process: apart from subscribing to the observable source, it does not actively poll the source, but merely reacts to the data being pushed to it. When the stream has no more data to offer, or when it errs, the source will send a notice to the subscriber. In this way, the application will not be blocked by waiting for the source to update.

This is the push pattern employed by Reactive Extensions. It is similar to joining a book club in which you register your interest in a particular genre, and books that match your interest are automatically sent to you as they are published. You do not need to stand in line to acquire something that you want. Employing a push pattern is helpful in many scenarios, especially in a UI-heavy environment in which the UI thread cannot be blocked while the application is waiting for some events. In summary, by using Rx, you can make your application more responsive.

The push model implemented by Rx is represented by the observable pattern of Rx.Observable/Observer. The Rx.Observable will notify all the observers automatically of any state changes. To register an interest through a subscription, you use the subscribe method of Rx.Observable, which takes on an Observer and returns a disposable. This gives you the ability to track and dispose of the subscription. In addition, Rx’s LINQ implementation over observable sequences allows developers to compose complex event processing queries over push-based sequences such as events, APM-based (“IAsyncResult”) computations, Task-based computations, and asynchronous workflows. For more information on the Observable/Observer classes, see Exploring The Major Classes in Rx. For tutorials on using the different features in Rx, see Using Rx.

Getting Started with Rx

This section describes in general what Reactive Extensions (Rx) is, and how it can benefit programmers who are creating asynchronous applications. The code samples in this documentation were created using Ruby version 2.0.

When Will You Use Rx

This topic describes the advantage of using Rx for users who are currently using the Ruby event model for asynchronous programming.

Advantages of using Rx

Whether you are authoring a traditional desktop or web-based application, you have to deal with asynchronous programming from time to time. Desktop applications have I/O or UI threads that might take a long time to complete and potentially block all other active threads. A user of the modern asynchronous programming model has to manage exceptions and cancellation of events manually. To compose or filter events, he has to write custom code that is hard to decipher and maintain.

In addition, if your application interacts with multiple sources of data, the conventional way to manage all of these interactions is to implement separate methods as event handlers for each of these data streams. For example, as soon as a user types a character, a keydown event is pushed to your keydown event handler method. Inside this keydown event handler, you have to provide code to react to this event, or to coordinate between all of the different data streams and process this data into a useable form.

Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer class. The Observable class maintains a list of dependent Observer threads and notifies them automatically of any state changes. You can query observable sequences using standard LINQ query operators implemented by the Rx.Observable type. Thus you can filter, aggregate, and compose on multiple events easily by using these static LINQ operators. Cancellation and exceptions can also be handled gracefully by using extension methods provided by Rx.

The following example shows how easy it is to implement an observable in Ruby.

 

# Subscribe to an observable

observer = Observer.new do |o|

o.with_on_next {|next_value| puts "Next: #{next_value}" }

o.with_on_error {|exception| puts "Exception: #{exception.message}"}

o.with_on_completed { puts "done!" }    

end

source = Observable.range(1, 10)

res = source.subscribe(observer)

Manipulating Events

In Ruby, events are simple mechanisms for communication between threads. As we have discussed earlier, Rx represents events as a collection of objects: e.g., a MouseMove event contains a collection of Point values. Due to the first-class object nature of observables, they can be passed around as function parameters and returns, or stored in a variable.

Unsubscribing from Events

In Rx, when you subscribe to an observable sequence representing an event stream, you can specify how long you would like to be notified of changes from the sequence (e.g., n iterations, or for a time interval similar to “do not push between 3-5pm”, or when some other event happens).  In addition, when you subscribe to an observable sequence, you get an disposable handle which you can use to unsubscribe (by calling Dispose) to the sequence later.

Installing Rx

This topic describes where you can download the Reactive Extensions (Rx) SDK.

To download Rx

Reactive Extensions is available for different platforms such as Ruby, Python, JavaScript, as well as Windows Phone 7 and .NET frameworks. You can download the libraries, as well as learn about their prerequisites at the Rx MSDN Developer Center.

Differences Between Versions of Rx

The following topic describes the various platforms for which you can develop solutions using Reactive Extensions.

To get the latest release of Rx, as well as learn about its prerequisites, please visit the Rx MSDN Developer Center.

Ruby

Rx for Ruby (Rx.rb) allows you to use Linq operators to create push-based observable collections in Ruby.

Python

RX for Python (Rx.py) allows you to use Linq operators in Python. Rx.py allows you to implement push-based observable collections, allowing users to seamlessly integrate Rx into their existing Python applications.

.NET Framework

The core Rx interfaces, Observable<T> and Observer<T>, ship as part of .NET Framework 4. If you are running on .NET Framework 3.5 SP1, or if you want to take advantage of the LINQ operators implemented in T:Rx.Observable type, as well as many other features such as schedulers, you can download the Rx assemblies in the Rx MSDN Developer Center.

Silverlight

Silverlight disallows you from making cross-threading calls, thus you cannot use a background thread to update the UI. Instead of writing verbose code using the Dispatcher.BeginInvoke call to explicitly execute code on the main UI thread, you can use the factory Observable.Start method provided by the Rx assemblies to invoke an action asynchronously. Cross-threading is taken care of transparently by Rx under the hood.

You can also use the various Observable operator overloads that take in a Scheduler, and specify the T:Rx.Concurrency.DispatcherScheduler to be used.

Javascript

Rx for Javascript (RxJS) allows you to use LINQ operators in JavaScript. It provides easy-to-use conversions from existing DOM, XmlHttpRequest (AJAX), and jQuery events to push-based observable collections, allowing users to seamlessly integrate Rx into their existing JavaScript-based websites.

RxJS brings similar capabilities to client script and integrates with jQuery events (Rx.Observable.FromJQueryEvent). It also supports Script#.

Windows Phone

Windows Phone 7 ships with a version of the Reactive Extensions baked into the ROM of the device. For more information, see Reactive Extensions for .NET Overview for Windows Phone. Documentation for this version of the Reactive Extensions can be found in Windows Phone API library at Microsoft.Phone.Reactive Namespace.

The Rx MSDN Developer Center also contains an updated version of Rx for WP7, which has new definitions in the System.Reactive.Linq namespace. Note that the new APIs will not clash with the library built in to the phone (nor do they replace the version in the ROM). For more information on the differences of these 2 versions, see this Rx team blog post.

Using Rx

This section includes topics that explain how you use Rx to create and subscribe to sequences and using schedulers. It also describes more advanced tasks such as implementing your own operators.

Exploring The Major Classes in Rx

This topic describes the major Reactive Extensions (Rx) classes used to represent observable sequences and subscribe to them.

Observable/Observer

Rx exposes asynchronous and event-based data sources as push-based, observable sequences. This Observable class represents a data source that can be observed, meaning that it can send data to anyone who is interested. It maintains a list of dependent Observer implementations representing such interested listeners, and notifies them automatically of any state changes.

As described in What is Rx, the other half of the push model is represented by the Observer class, which represents an observer who registers an interest through a subscription. Items are subsequently handed to the observer from the observable sequence to which it subscribes.

In order to receive notifications from an observable collection, you use the subscribe method of Observable to hand it an Observer object. In return for this observer, the subscribe method returns a disposable object that acts as a handle for the subscription. This allows you to clean up the subscription after you are done. Calling dispose on this object detaches the observer from the source so that notifications are no longer delivered. As you can infer, in Rx you do not need to explicitly unsubscribe from an event.

Observers support three publication events, reflected by the interface’s methods. on_next can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events can send out a Point object every time the mouse has moved. The other two methods are used to indicate completion or errors.

The following lists the Observable/Observer modules.

 

module Observable

   def subscribe(observer)

      auto_detach_observer = observer.is_a?(AutoDetachObserver) ? observer : AutoDetachObserver.new(observer)

      subscription = AutoDetachDisposable.new(auto_detach_observer)

      auto_detach_observer.add(subscription)

      subscription.set(@subscribe_action.call(auto_detach_observer))

      subscription

end

class Observer

   @on_next_action

   @on_error_action

   @on_completed_action

 

   def on_next(value)

      @on_next_action.call(value)

   end

   def on_error(exception)

      @on_error_action.call(exception)

   end

   def on_completed

      @on_completed_action.call

   end

end

For each publication event (on_next, on_error, on_completed) of an observable sequence, you can specify a delegate that will be invoked, as shown in the following example. If you do not specify an action for an event, the default behavior will occur.

 

observer = Observer.new do |o|

o.with_on_next {|next_value| puts "Next: #{next_value}" }

o.with_on_error {|exception| puts "Exception: #{exception.message}"}

o.with_on_completed { puts "done!" }    

end

source = Observable.Range(1, 5) # creates an observable sequence of 5 integers, starting from 1

subscription = source.Subscribe(observer)

 

You can treat the observable sequence (such as a sequence of mouse-over events) as if it were a normal collection. Thus you can write LINQ queries over the collection to do things like filtering, grouping, composing, etc.

See Also

Creating and Subscribing to Simple Observable Sequences

Querying Observable Collections using LINQ Operators

Creating and Querying Observable Sequences

This section describes how you can create and subscribe to an observable sequence and query it.

Creating and Subscribing to Simple Observable Sequences

By installing the Reactive Extension assemblies, you can take advantage of the Rx.Observable class which provides many static LINQ operators for you to create a simple sequence with zero, one or more elements. In addition, Rx provides subscribe extension methods that take various combinations of on_next, on_error and on_completed handlers in terms of delegates.

Creating and subscribing to a simple sequence

The following sample uses the range operator of the Rx.Observable type to create a simple observable collection of numbers. The observer subscribes to this collection using the subscribe method of the Rx.Observable class, and provides actions that are delegates which handle on_next, on_error and on_completed.

The range operator has several overloads. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards. 

As soon as the subscription happens, the values are sent to the observer. The on_next delegate then prints out the values.

 

#!/usr/bin/ruby

require "Rx"

class Program

    observer = Observer.new do |o|

    o.with_on_next {|next_value| puts "Next: #{next_value}" }

    o.with_on_error {|exception| puts "Exception: #{exception.message}"}

    o.with_on_completed { puts "done!" }

    end

    source = Observable.range(1, 10)

    subscription = source.subscribe(observer)

        subscription.dispose()

end

 

When an observer subscribes to an observable sequence, the thread calling the Subscribe method can be different from the thread in which the sequence runs till completion. Therefore, the Subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes.

Notice that the subscribe method returns a disposable, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the dispose method on the observable sequence, the observer will stop listening to the observable for data.  Normally, you do not need to explicitly call dispose unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. When the disposable instance is collected by the garbage collector, Rx does not automatically dispose of the subscription. However, note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an on_completed or on_error messages is published). 

Converting an Enumerable Collection to an Observable Sequence

Using the to_observable operator, you can convert a generic enumerable collection to an observable sequence and subscribe to it.

observer = Observer.new do |o|

    o.with_on_next {|next_value| puts "Next: #{next_value}" }

    o.with_on_error {|exception| puts "Exception: #{exception.message}"}

    o.with_on_completed { puts "done!" }

 end

 

e = [ 1, 2, 3, 4, 5 ]

 

source = e.to_observable

subscription = source.Subscribe(observer)

Querying Observable Sequences using LINQ Operators

In this topic, we will look at the first-class nature of observable sequences as Observable objects, in which generic LINQ operators are supplied by the Rx module to manipulate these objects. Most operators take an observable sequence and perform some logic on it and output another observable sequence.

Using Different Operators

In this topic, we will use static LINQ operators of the Observable type so that you can filter, group and transform data. Such operators take observable sequence(s) as input, and produce observable sequence(s) as output.

Combining different sequences

In this section, we will examine some of the operators that combine various observable sequences into a single observable sequence. Notice that data are not transformed when we combine sequences.  

In the following sample we use the merge operator. If you run the following sample code, you will get 1,1,2,2,3,3. This is because the two sequences are active at the same time and values are pushed out as they occur in the sources. The resultant sequence only completes when the last source sequence has finished pushing values.

Notice that for merge to work, all the source observable sequences need to be of the same type of Observable. The resultant sequence will be of the type Observable<T>. If source1 produces an on_error in the middle of the sequence, then the resultant sequence will complete immediately.

 

observer = Observer.new do |o|

    o.with_on_next {|next_value| puts "Next: #{next_value}" }

    o.with_on_error {|exception| puts "Exception: #{exception.message}"}

    o.with_on_completed { puts "done!" }

end

source1 = Observable.range(1, 3)

source2 = Observable.range(1, 3)

source1.merge(source2).subscribe(observer)

Filtering

In the following example, we use the generate operator to create a simple observable sequence of numbers. The generate operator has several overloads. In our example, it takes an initial state (0 in our example), a conditional function to terminate (fewer than 10 times), a result selector (a square function of the current value), an iterator (+1), and print out only those smaller than 15 using the where operator.

 

observer = Observer.new do |o|

    o.with_on_next {|next_value| puts "Next: #{next_value}" }

    o.with_on_error {|exception| puts "Exception: #{exception.message}"}

    o.with_on_completed { puts "done!" }

end

seq = Observable.generate(

0,

lambda {|x| x < 10},

lambda {|x| x*x},

lambda {|x| x + 1})

source = seq.where {|x| x < 15}

source.subscribe(observer)# output is 0, 1, 4, 9

Implementing Your Own Operators for Observable

You can extend Rx by adding new operators for operations that are not provided by the Rx library, or by creating your own implementation of standard query operators to improve readability and performance. Writing a customized version of a standard LINQ operator is useful when you want to operate with in-memory objects and when the intended customization does not require a comprehensive view of the query.

Creating New Operators

Rx offers a set of operators that cover some of the possible operations on a set of entities. However, you might need an operator to add a particular semantic meaning to your query—especially if you can reuse that same operator several times in your code.

By reusing existing LINQ operators when you build a new one, you can take advantage of the existing performance or exception handling capabilities implemented in the Rx libraries.

When writing a custom operator, it is good practice not to leave any disposables unused; otherwise, you may find that resources could actually be leaked and cancellation may not work correctly.

Customizing Existing Operators

Adding new operators to LINQ is a way to extend its capabilities. However, you can also improve code readability by wrapping existing operators into more specialized and meaningful ones.

Last edited Jul 11, 2013 at 10:01 PM by Snesha, version 2

Comments

No comments yet.