Rx.js 5
(Korniychuk Anton - 6 may 2017)
multicasting операторы
.multicast(subject: Subject | subjectSelector: Function, [selector: Function]): ConnectableObservable
# автоматически подписывается на текущий поток и транслирует все сообщения в subject
.connect(): Subscription
# подписаться на исходный поток и начать трансляцию в subject. Что бы снять подписку нужно вызвать
unsubscribe на подписке которую вернет .connect()
.refCount(): Observable
# подписаться на оригинальный поток только когда на subject появится хотя бы один подписчик.
и отписаться когда у subject отпишется последний подписчик
Аргументы:
subject - Subject to push source elements into
subjectSelector - Ф-я которая должна создавать новый subject
selector: (shared: Observable) => Observable - sendbox. Takes shared source stream and returns another stream
Примечания:
1. работает одинаково для подписок перед connect и после него
2. подписка на реальный поток осуществляется только по вызову .connect()
Example: multicast, connect
Note: finite stream
const observable = Rx.Observable.interval(1000).take(5);
const connectableObservable = observable.multicast(new Rx.ReplaySubject(1));
connectableObservable.subsctibe(...); # 0..1..2..3..4
connectableObservable.subsctibe(...); # 0..1..2..3..4
connectableObservable.connect();
setTimeout(() => connectableObservable.subscribe(...), 2000);
# ..1..2..3..4
# "1" will be shown because replay subject has "one" buffer size
Example: unsubscribe
const observable = Rx.Observable.interval(1000);
const connectableObservable = observable
.do((n) => console.log(n))
.multicast(new Rx.Subject());
const sub = connectableObservable.connect();
setTimeout(() => sub.unsubscribe(), 3000);
Результат:
1
2
3
Example: refCount
Note: infinite stream
const observable = Rx.Observable.interval(1000)
.do((n) => console.log('Source:'+n))
.finally(() => console.log('Source done'))
.multicast(new Rx.Subject())
.refCount()
.do((n) => console.log('Subject:'+n))
.finally(() => console.log('Subject done'));
const subA = observable.subscribe((n) => console.log('A:'+n));
const subB = observable.subscribe((n) => console.log('B:'+n));
setTimeout(() => {
subA.unsubscribe();
subB.unsubscribe();
}, 3000);
Результат:
Source:0
Subject:0
A:0
Subject:0
B:0
... 1 second ...
Source:1
Subject:1
A:1
Subject:1
B:1
... 1 second ...
Source:2
Subject:2
A:2
Subject:2
B:2
Subject done
Subject done
Source done
Example: subjectSelector
function subjectFactory() {
console.log('--- New Subject ---');
return new Rx.Subject();
}
Notice: finite stream. And it finishes before unsubscribe
const observable = Rx.Observable.interval(1000).take(2)
.do((n) => console.log('Source:'+n))
.finally(() => console.log('Source done'))
.multicast(subjectFactory)
.refCount();
const subA = observable
.finally(() => console.log('Subject: A done'))
.subscribe((n) => console.log('A:'+n));
setTimeout(() => subA.unsubscribe(), 2500);
setTimeout(() => {
const subB = observable
.finally(() => console.log('Subject: B done'))
.subscribe((n) => console.log('B:'+n));
setTimeout(() => subB.unsubscribe(), 1000);
}, 3000);
Результат:
--- New Subject ---
Source:0
A:0
Source:1
A:1
Source done
Subject: A done
--- New Subject ---
Source:0
B:0
Subject: B done
Source done
Example: multicast selector
const result = Rx.Observable.interval(1000).take(3)
.do((n) => console.log('Source: ', n))
.map(() => Math.random())
.multicast(new Rx.Subject(), (shared) => {
const sharedDelayed = shared.delay(500);
const merged = shared.merge(sharedDelayed)
return merged;
});
result.subscribe(console.log);
Результат:
Source: 0
0.7615296234846458
0.7615296234846458
Source: 1
0.7958732452458277
0.7958732452458277
Source: 2
0.2250489174779795
0.2250489174779795
.publish() => .multicast(new Rx.Subject())
.publishReplay(bufferSize: number) => .multicast(new Rx.ReplaySubject(bufferSize: number))
.publishBehavior(initialValue: any) => .multicast(new Rx.BehaviorSubject(initialValue: any))
.publishLast() => .multicast(new Rx.AsyncSubject())
.share() => .publish().refCount()
.shareReplay(bufferSize: number) => .publishReplay(bufferSize: number).refCount()
Создать Observable
const observable = Rx.Observable.create((observer: Observer<T>) => {
observer.next(value: T);
observer.error(error: any);
observer.complete();
observer.closed # boolean - has already been unsubscribed?
return () => console.log('Unsubscribed');
});
Примечания:
1. возвращаемая ф-я отличное место для отписки от события, остановки http запроса и т.д.
Конструкторы
new Rx.Subject()
# канал без сохранения последнего значения. Следующее сообщение будет передано только тем подписчикам которые были подписаны на него до передачи этого сообщения.
new Rx.ReplaySubject([bufferSize: number], [windowSize: number], [scheduler: Schedule])
# для нового подписчика повторяет последние N сообщений из канала
Аргументы:
[bufferSize = Number.MAX_VALUE] - сколько элементов повторять
[windowSize = Number.MAX_VALUE] - забывать элементы которые старше этого возраста(в ms)
[scheduler = Rx.Scheduler.currentThread]
Примечания:
1. если подписаться после того как канал закрыт то новому подписчику будет передан весь поток сообщений в соответствии с настройками в конструкторе. И только после этого будет вызван complete
new Rx.BehaviorSubject(initialValue: any)
# сохраняет последнее сообщение которое было в канале. При подписке на канал первым полученным сообщением будет то которое было передано перед подпиской. Или значение по умолчанию если не было передано иного.
Примечания:
1. если подписаться после того как канал закрыт последнее сообщение передано не будет. Будет сразу вызван complete но при это .getValue() вернет последнее значение
2. всегда имеет значение
new Rx.AsyncSubject()
# передаст подписчику только последнее значение потока и вызовет у него complete
Примечания:
1. работает одинаково как для подписчиков которые подписались до complete так и для тех что после
Summary:
Subject: no replay
ReplaySubject: replays many, before or after completion
BehaviorSubject: replays one, only before completion
AsyncSubject: replays one, only if completed
Обработка ошибок
Error Handler - обработчик ошибок. Задается через .catch() ф-ю или вторым аргументом в subscribe()
Как сгенерировать ошибку
I. Используя observer
1. просто передаст это текст как аргумент в ближайший Error Handler
observer.error('my error');
2. return - удобный способ прервать выполнение ф-и create на этой строке, в остальном работает как и предыдущий способ
return observer.error('my error');
3. то же самое что первый способ, но обернуть объектом Error
observer.error(new Error('my error'));
4. то же самое что первый способ, но обернуть в ErrorObservable
observer.error(Rx.Observable.throw('my error'));
5. то же самое что первый способ, но обернуть стразу в 2 обертки. Думаю это лишено смысла.
observer.error(Rx.Observable.throw(new Error('my error')));
II. Бросить исключение
# Оно так же попадет в ближайший Error Handler как и при использовании observer.error()
но в дополнение будет проброшено дальше и вылетит в консоль как обычное исключение.
throw 'my error';
throw new Error('my error');
Что бы исключение не пробрасывалось в консоль его можно поймать например в .catch() вот так.
после прохода по такому catch блоку, ошибка становится обычным сообщение как будто оно было
отправлено через .next()
.catch((err) => Rx.Observable.of(err))