Categories: All - events

by Mike Ton 9 years ago

436

Unity_Rx

Rx, short for Reactive Extensions, is built on the Observer pattern, which .NET supports through multicast delegates or events. However, multicast delegates and events have several limitations, such as being difficult to compose, prone to memory leaks, and lacking built-in concurrency support.

Unity_Rx

Unity_Rx

Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;

In C#, events have a curious interface. Some find the += and -= operators an unnatural way to register a callback

Events are difficult to compose

Events don't offer the ability to be easily queried over time

Events are a common cause of accidental memory leaks

Events do not have a standard pattern for signaling completion

Events provide almost no help for concurrency or multithreaded applications. e.g. To raise an event on a separate thread requires you to do all of the plumbing

Rx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.

public

interface
IObserver{ ... }

void

OnCompleted();

//Notifies the observer that the provider has finished sending push-based notifications.

OnError(Exception error);

//Notifies the observer that the provider has experienced an error condition.

OnNext(T value);

//Provides the observer with new data.

IObserver

Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception), or an OnCompleted(). This protocol does not however demand that an OnNext(T), OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.

Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.

// 'reader/consumer' interface

IObservable{ ... }

IDisposable

//only link to subscription, if not captured is lost...can't gc...memory leak

//built in methods gc???

Subscribe(IObserver observer);

You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

----

IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

---

.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types. Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand. While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T> that does exhibit collection-like behavior, and is very well suited to this description. In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and IObservable<T> instances are sequences of data in motion.

// 'write/publisher' interface

// streaming sequence of object T

// IObservable instances are sequences of data in motion

// While instances of IEnumerable are also sequences, we will adopt the convention that they are sequences of data at rest, and

// not == ioStream; but has streaming(push), disposing(closing) and completing(eof)

// Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding

(def)

Rx
// Rx offers the ability to query data in motion
LINQ
// allows you query data at rest

(example)

Observable
??? What is the difference between

Bind

Subscribe

(Transform)
Timer

if (state is Wave) { // Wait for the alloted amount of time Observable.Interval(TimeSpan.FromSeconds(wavesFPSGame.SpawnWaitSeconds)) .Where(_ => wavesFPSGame.WavesState is Wave) .Take(wavesFPSGame.KillsToNextWave) .Subscribe(l => { SpawnEnemy(); }); }

//Time based functionality without co-routines in controller to trigger waves.

Observable.Interval(TimeSpan.FromMilliseconds(1000)) .Subscribe(_ => { ExecutedamagePerSecondTick(); }).DisposeWhenChanged(Player.healthStateProperty);

//Once per second, execute command to damage player

Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(...);

l=>{ myElement.JumpLock = false; }

//l=> ; l==long???

View(Mono)

(statemachine)

{StateMachineProperty}Property.Subscribe(_=>{ {StateMachineProperty}Property.LastState });

//Bind to state machines last state

(sprites)

var sprites = GetComponentsInChildren().ToList (); if(value){ Observable.Interval(TimeSpan.FromMilliseconds(100)) .Subscribe(l => { if (Character.isInvulnerable) sprites.ForEach(s => s.enabled = 1 % 2 == 0); }).DisposeWhenChanged(Character._isInvulnerableProperty); } else { sprites.ForEach(s => s.enabled = true); }

//Flash sprites on and off rapidly.

(uGUI)

Button poorButton; // Our button. poorButton.AsClickObservable() .Where(_ => YOUR FILTER ) .Throttle(_ => PREVENT SPAMMING ) .DelayFrame( FRAMES TO DELAY ) .SkipWhile( SKIP FIRST TAPS IF CONDITION IS NOT MET ) .Subscribe(_ => { USE LIKE SIMPLE OBSERVABLE });

//uGUI events as observables. A button as an event stream in your view.

(Update)

this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Throttle(TimeSpan.FromSeconds(1)).Subscribe(_=>{ Debug.Log("This is only called once person second. This keeps many clicks from firing this to quickly") });

this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Subscribe(_=>{ Debug.Log("THIS IS CALLED when the mouse button is down") });

this.UpdateAsObservable().Subscribe(_=>{ Debug.Log("THIS IS CALLED EVERY FRAME") });

(position)

protected

override

IObservable

GetvPosObservable (){ ... }

(overrides)

return PositionAsObservable.Sample(TimeSpan.FromSeconds(1.0f)).Select(p=>CalculatevPos());

//only invokes every x seconds

return PositionAsObservable.Select(p=>CalculatevPos());

//only invokes if postion has changed

//p==Vector3 (pos) ???

// Viewmodel collection = ModelCollection class

// or p == p class (property Class for viewmodel )

return base.GetvPosObservable ();

Vector3

CalculatevPos (){ ... }

return this.transform.position;

// calculates on each update