by Mike Ton 9 years ago
436
More like this
Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;
In C#, events have a curious interface. Some find the += and -= operators an unnatural way to register a callback
Events are difficult to compose
Events don't offer the ability to be easily queried over time
Events are a common cause of accidental memory leaks
Events do not have a standard pattern for signaling completion
Events provide almost no help for concurrency or multithreaded applications. e.g. To raise an event on a separate thread requires you to do all of the plumbing
Rx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.
void
OnCompleted();
//Notifies the observer that the provider has finished sending push-based notifications.
OnError(Exception error);
//Notifies the observer that the provider has experienced an error condition.
OnNext(T value);
//Provides the observer with new data.
IObserver
Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception), or an OnCompleted(). This protocol does not however demand that an OnNext(T), OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.
Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.
// 'reader/consumer' interface
IDisposable
//only link to subscription, if not captured is lost...can't gc...memory leak
//built in methods gc???
Subscribe(IObserver
You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.
----
IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.
---
.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types. Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand. While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T> that does exhibit collection-like behavior, and is very well suited to this description. In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and IObservable<T> instances are sequences of data in motion.
// 'write/publisher' interface
// streaming sequence of object T
// IObservable
// While instances of IEnumerable
// not == ioStream; but has streaming(push), disposing(closing) and completing(eof)
// Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding
Bind
Subscribe
if (state is Wave) { // Wait for the alloted amount of time Observable.Interval(TimeSpan.FromSeconds(wavesFPSGame.SpawnWaitSeconds)) .Where(_ => wavesFPSGame.WavesState is Wave) .Take(wavesFPSGame.KillsToNextWave) .Subscribe(l => { SpawnEnemy(); }); }
//Time based functionality without co-routines in controller to trigger waves.
Observable.Interval(TimeSpan.FromMilliseconds(1000)) .Subscribe(_ => { ExecutedamagePerSecondTick(); }).DisposeWhenChanged(Player.healthStateProperty);
//Once per second, execute command to damage player
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(...);
l=>{ myElement.JumpLock = false; }
//l=> ; l==long???
(statemachine)
{StateMachineProperty}Property.Subscribe(_=>{ {StateMachineProperty}Property.LastState });
//Bind to state machines last state
(sprites)
var sprites = GetComponentsInChildren().ToList (); if(value){ Observable.Interval(TimeSpan.FromMilliseconds(100)) .Subscribe(l => { if (Character.isInvulnerable) sprites.ForEach(s => s.enabled = 1 % 2 == 0); }).DisposeWhenChanged(Character._isInvulnerableProperty); } else { sprites.ForEach(s => s.enabled = true); }
//Flash sprites on and off rapidly.
(uGUI)
Button poorButton; // Our button. poorButton.AsClickObservable() .Where(_ => YOUR FILTER ) .Throttle(_ => PREVENT SPAMMING ) .DelayFrame( FRAMES TO DELAY ) .SkipWhile( SKIP FIRST TAPS IF CONDITION IS NOT MET ) .Subscribe(_ => { USE LIKE SIMPLE OBSERVABLE });
//uGUI events as observables. A button as an event stream in your view.
(Update)
this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Throttle(TimeSpan.FromSeconds(1)).Subscribe(_=>{ Debug.Log("This is only called once person second. This keeps many clicks from firing this to quickly") });
this.UpdateAsObservable().Where(_=>Input.GetMouseButtonDown(0)).Subscribe(_=>{ Debug.Log("THIS IS CALLED when the mouse button is down") });
this.UpdateAsObservable().Subscribe(_=>{ Debug.Log("THIS IS CALLED EVERY FRAME") });
(position)
protected
override
IObservable
GetvPosObservable (){ ... }
(overrides)
return PositionAsObservable.Sample(TimeSpan.FromSeconds(1.0f)).Select(p=>CalculatevPos());
//only invokes every x seconds
return PositionAsObservable.Select(p=>CalculatevPos());
//only invokes if postion has changed
//p==Vector3 (pos) ???
// Viewmodel collection = ModelCollection
// or p == p
return base.GetvPosObservable ();
Vector3
CalculatevPos (){ ... }
return this.transform.position;
// calculates on each update