Rx.js 5
(Korniychuk Anton - 6 may 2017)

multicasting операторы

.multicast(subject: Subject | subjectSelector: Function, [selector: Function]): ConnectableObservable
# автоматически подписывается на текущий поток и транслирует все сообщения в subject
.connect(): Subscription
# подписаться на исходный поток и начать трансляцию в subject. Что бы снять подписку нужно вызвать
unsubscribe на подписке которую вернет .connect()
.refCount(): Observable
# подписаться на оригинальный поток только когда на subject появится хотя бы один подписчик.
и отписаться когда у subject отпишется последний подписчик


Аргументы:
subject - Subject to push source elements into
subjectSelector - Ф-я которая должна создавать новый subject
selector: (shared: Observable) => Observable - sendbox. Takes shared source stream and returns another stream
Примечания:
1. работает одинаково для подписок перед connect и после него
2. подписка на реальный поток осуществляется только по вызову .connect()

Example: multicast, connect

Note: finite stream
const observable = Rx.Observable.interval(1000).take(5);
const connectableObservable = observable.multicast(new Rx.ReplaySubject(1));

connectableObservable.subsctibe(...); # 0..1..2..3..4
connectableObservable.subsctibe(...); # 0..1..2..3..4

connectableObservable.connect();

setTimeout(() => connectableObservable.subscribe(...), 2000);
# ..1..2..3..4
# "1" will be shown because replay subject has "one" buffer size

Example: unsubscribe

const observable = Rx.Observable.interval(1000);
const connectableObservable = observable
.do((n) => console.log(n))
.multicast(new Rx.Subject());

const sub = connectableObservable.connect();

setTimeout(() => sub.unsubscribe(), 3000);

Результат:
1
2
3

Example: refCount

Note: infinite stream
const observable = Rx.Observable.interval(1000)
.do((n) => console.log('Source:'+n))
.finally(() => console.log('Source done'))

.multicast(new Rx.Subject())
.refCount()

.do((n) => console.log('Subject:'+n))
.finally(() => console.log('Subject done'));

const subA = observable.subscribe((n) => console.log('A:'+n));
const subB = observable.subscribe((n) => console.log('B:'+n));

setTimeout(() => {
subA.unsubscribe();
subB.unsubscribe();
}, 3000);

Результат:

Source:0
Subject:0
A:0
Subject:0
B:0
... 1 second ...
Source:1
Subject:1
A:1
Subject:1
B:1
... 1 second ...
Source:2
Subject:2
A:2
Subject:2
B:2
Subject done
Subject done
Source done

Example: subjectSelector

function subjectFactory() {
console.log('--- New Subject ---');
return new Rx.Subject();
}

Notice: finite stream. And it finishes before unsubscribe
const observable = Rx.Observable.interval(1000).take(2)
.do((n) => console.log('Source:'+n))
.finally(() => console.log('Source done'))

.multicast(subjectFactory)
.refCount();

const subA = observable
.finally(() => console.log('Subject: A done'))
.subscribe((n) => console.log('A:'+n));

setTimeout(() => subA.unsubscribe(), 2500);

setTimeout(() => {
const subB = observable
.finally(() => console.log('Subject: B done'))
.subscribe((n) => console.log('B:'+n));

setTimeout(() => subB.unsubscribe(), 1000);
}, 3000);

Результат:

--- New Subject ---
Source:0
A:0
Source:1
A:1
Source done
Subject: A done
--- New Subject ---
Source:0
B:0
Subject: B done
Source done

Example: multicast selector

const result = Rx.Observable.interval(1000).take(3)
.do((n) => console.log('Source: ', n))
.map(() => Math.random())
.multicast(new Rx.Subject(), (shared) => {
const sharedDelayed = shared.delay(500);
const merged = shared.merge(sharedDelayed)
return merged;
});

result.subscribe(console.log);

Результат:

Source: 0
0.7615296234846458
0.7615296234846458
Source: 1
0.7958732452458277
0.7958732452458277
Source: 2
0.2250489174779795
0.2250489174779795

.publish() => .multicast(new Rx.Subject())
.publishReplay(bufferSize: number) => .multicast(new Rx.ReplaySubject(bufferSize: number))
.publishBehavior(initialValue: any) => .multicast(new Rx.BehaviorSubject(initialValue: any))
.publishLast() => .multicast(new Rx.AsyncSubject())

.share() => .publish().refCount()
.shareReplay(bufferSize: number) => .publishReplay(bufferSize: number).refCount()

Создать Observable

const observable = Rx.Observable.create((observer: Observer<T>) => {

observer.next(value: T);
observer.error(error: any);
observer.complete();

observer.closed # boolean - has already been unsubscribed?

return () => console.log('Unsubscribed');
});

Примечания:

1. возвращаемая ф-я отличное место для отписки от события, остановки http запроса и т.д.

Конструкторы

new Rx.Subject()
# канал без сохранения последнего значения. Следующее сообщение будет передано только тем подписчикам которые были подписаны на него до передачи этого сообщения.

new Rx.ReplaySubject([bufferSize: number], [windowSize: number], [scheduler: Schedule])
# для нового подписчика повторяет последние N сообщений из канала
Аргументы:
[
bufferSize = Number.MAX_VALUE] - сколько элементов повторять
[windowSize = Number.MAX_VALUE] - забывать элементы которые старше этого возраста(в ms)
[scheduler = Rx.Scheduler.currentThread]
Примечания:
1. если подписаться после того как канал закрыт то новому подписчику будет передан весь поток сообщений в соответствии с настройками в конструкторе. И только после этого будет вызван complete

new Rx.BehaviorSubject(initialValue: any)
# сохраняет последнее сообщение которое было в канале. При подписке на канал первым полученным сообщением будет то которое было передано перед подпиской. Или значение по умолчанию если не было передано иного.
Примечания:
1. если подписаться после того как канал закрыт последнее сообщение передано не будет. Будет сразу вызван complete но при это .getValue() вернет последнее значение
2. всегда имеет значение

new Rx.AsyncSubject()
# передаст подписчику только последнее значение потока и вызовет у него complete
Примечания:
1.
работает одинаково как для подписчиков которые подписались до complete так и для тех что после

Summary:
Subject: no replay
ReplaySubject: replays many, before or after completion
BehaviorSubject: replays one, only before completion
AsyncSubject: replays one, only if completed

Обработка ошибок

Error Handler - обработчик ошибок. Задается через .catch() ф-ю или вторым аргументом в subscribe()

Как сгенерировать ошибку

I. Используя observer

1.
просто передаст это текст как аргумент в ближайший Error Handler
observer.error('my error');
2. return - удобный способ прервать выполнение ф-и create на этой строке, в остальном работает как и предыдущий способ
return observer.error('my error');
3. то же самое что первый способ, но обернуть объектом Error
observer.error(new Error('my error'));
4. то же самое что первый способ, но обернуть в ErrorObservable
observer.error(Rx.Observable.throw('my error'));
5. то же самое что первый способ, но обернуть стразу в 2 обертки. Думаю это лишено смысла.
observer.error(Rx.Observable.throw(new Error('my error')));

II. Бросить исключение
# Оно так же попадет в ближайший Error Handler как и при использовании observer.error()
но в дополнение будет проброшено дальше и вылетит в консоль как обычное исключение.


throw 'my error';
throw new Error('my error');

Что бы исключение не пробрасывалось в консоль его можно поймать например в .catch() вот так.
после прохода по такому catch блоку, ошибка становится обычным сообщение как будто оно было
отправлено через
.next()
.catch((err) => Rx.Observable.of(err))