Kategoriak: All - subject

arabera Anton Alexeyevich 2 years ago

460

multicasting

Rx.js предоставляет мощные инструменты для работы с асинхронными потоками данных. Одним из основных элементов является Observable, который позволяет создавать и управлять потоками данных.

multicasting

Rx.js 5 (Korniychuk Anton - 6 may 2017)

Обработка ошибок

Как сгенерировать ошибку
II. Бросить исключение # Оно так же попадет в ближайший Error Handler как и при использовании observer.error() но в дополнение будет проброшено дальше и вылетит в консоль как обычное исключение. throw 'my error'; throw new Error('my error'); Что бы исключение не пробрасывалось в консоль его можно поймать например в .catch() вот так. после прохода по такому catch блоку, ошибка становится обычным сообщение как будто оно было отправлено через .next() .catch((err) => Rx.Observable.of(err))
I. Используя observer 1. просто передаст это текст как аргумент в ближайший Error Handler observer.error('my error'); 2. return - удобный способ прервать выполнение ф-и create на этой строке, в остальном работает как и предыдущий способ return observer.error('my error'); 3. то же самое что первый способ, но обернуть объектом Error observer.error(new Error('my error')); 4. то же самое что первый способ, но обернуть в ErrorObservable observer.error(Rx.Observable.throw('my error')); 5. то же самое что первый способ, но обернуть стразу в 2 обертки. Думаю это лишено смысла. observer.error(Rx.Observable.throw(new Error('my error')));
Error Handler - обработчик ошибок. Задается через .catch() ф-ю или вторым аргументом в subscribe()

Конструкторы

Summary: Subject: no replay ReplaySubject: replays many, before or after completion BehaviorSubject: replays one, only before completion AsyncSubject: replays one, only if completed
new Rx.AsyncSubject() # передаст подписчику только последнее значение потока и вызовет у него complete Примечания: 1. работает одинаково как для подписчиков которые подписались до complete так и для тех что после
new Rx.BehaviorSubject(initialValue: any) # сохраняет последнее сообщение которое было в канале. При подписке на канал первым полученным сообщением будет то которое было передано перед подпиской. Или значение по умолчанию если не было передано иного. Примечания: 1. если подписаться после того как канал закрыт последнее сообщение передано не будет. Будет сразу вызван complete но при это .getValue() вернет последнее значение 2. всегда имеет значение
new Rx.ReplaySubject([bufferSize: number], [windowSize: number], [scheduler: Schedule]) # для нового подписчика повторяет последние N сообщений из канала Аргументы: [bufferSize = Number.MAX_VALUE] - сколько элементов повторять [windowSize = Number.MAX_VALUE] - забывать элементы которые старше этого возраста(в ms) [scheduler = Rx.Scheduler.currentThread] Примечания: 1. если подписаться после того как канал закрыт то новому подписчику будет передан весь поток сообщений в соответствии с настройками в конструкторе. И только после этого будет вызван complete
new Rx.Subject() # канал без сохранения последнего значения. Следующее сообщение будет передано только тем подписчикам которые были подписаны на него до передачи этого сообщения.

Создать Observable

const observable = Rx.Observable.create((observer: Observer) => { observer.next(value: T); observer.error(error: any); observer.complete(); observer.closed # boolean - has already been unsubscribed? return () => console.log('Unsubscribed'); }); Примечания: 1. возвращаемая ф-я отличное место для отписки от события, остановки http запроса и т.д.

multicasting операторы

.publish() => .multicast(new Rx.Subject()) .publishReplay(bufferSize: number) => .multicast(new Rx.ReplaySubject(bufferSize: number)) .publishBehavior(initialValue: any) => .multicast(new Rx.BehaviorSubject(initialValue: any)) .publishLast() => .multicast(new Rx.AsyncSubject()) .share() => .publish().refCount() .shareReplay(bufferSize: number) => .publishReplay(bufferSize: number).refCount()
.multicast(subject: Subject | subjectSelector: Function, [selector: Function]): ConnectableObservable # автоматически подписывается на текущий поток и транслирует все сообщения в subject .connect(): Subscription # подписаться на исходный поток и начать трансляцию в subject. Что бы снять подписку нужно вызвать unsubscribe на подписке которую вернет .connect() .refCount(): Observable # подписаться на оригинальный поток только когда на subject появится хотя бы один подписчик. и отписаться когда у subject отпишется последний подписчик Аргументы: subject - Subject to push source elements into subjectSelector - Ф-я которая должна создавать новый subject selector: (shared: Observable) => Observable - sendbox. Takes shared source stream and returns another stream Примечания: 1. работает одинаково для подписок перед connect и после него 2. подписка на реальный поток осуществляется только по вызову .connect()
Example: multicast selector const result = Rx.Observable.interval(1000).take(3) .do((n) => console.log('Source: ', n)) .map(() => Math.random()) .multicast(new Rx.Subject(), (shared) => { const sharedDelayed = shared.delay(500); const merged = shared.merge(sharedDelayed) return merged; }); result.subscribe(console.log);

Результат: Source: 0 0.7615296234846458 0.7615296234846458 Source: 1 0.7958732452458277 0.7958732452458277 Source: 2 0.2250489174779795 0.2250489174779795

Example: subjectSelector function subjectFactory() { console.log('--- New Subject ---'); return new Rx.Subject(); } Notice: finite stream. And it finishes before unsubscribe const observable = Rx.Observable.interval(1000).take(2) .do((n) => console.log('Source:'+n)) .finally(() => console.log('Source done')) .multicast(subjectFactory) .refCount(); const subA = observable .finally(() => console.log('Subject: A done')) .subscribe((n) => console.log('A:'+n)); setTimeout(() => subA.unsubscribe(), 2500); setTimeout(() => { const subB = observable .finally(() => console.log('Subject: B done')) .subscribe((n) => console.log('B:'+n)); setTimeout(() => subB.unsubscribe(), 1000); }, 3000);

Результат: --- New Subject --- Source:0 A:0 Source:1 A:1 Source done Subject: A done --- New Subject --- Source:0 B:0 Subject: B done Source done

Example: refCount Note: infinite stream const observable = Rx.Observable.interval(1000) .do((n) => console.log('Source:'+n)) .finally(() => console.log('Source done')) .multicast(new Rx.Subject()) .refCount() .do((n) => console.log('Subject:'+n)) .finally(() => console.log('Subject done')); const subA = observable.subscribe((n) => console.log('A:'+n)); const subB = observable.subscribe((n) => console.log('B:'+n)); setTimeout(() => { subA.unsubscribe(); subB.unsubscribe(); }, 3000);

Результат: Source:0 Subject:0 A:0 Subject:0 B:0 ... 1 second ... Source:1 Subject:1 A:1 Subject:1 B:1 ... 1 second ... Source:2 Subject:2 A:2 Subject:2 B:2 Subject done Subject done Source done

Example: unsubscribe const observable = Rx.Observable.interval(1000); const connectableObservable = observable .do((n) => console.log(n)) .multicast(new Rx.Subject()); const sub = connectableObservable.connect(); setTimeout(() => sub.unsubscribe(), 3000);

Результат: 1 2 3

Example: multicast, connect Note: finite stream const observable = Rx.Observable.interval(1000).take(5); const connectableObservable = observable.multicast(new Rx.ReplaySubject(1)); connectableObservable.subsctibe(...); # 0..1..2..3..4 connectableObservable.subsctibe(...); # 0..1..2..3..4 connectableObservable.connect(); setTimeout(() => connectableObservable.subscribe(...), 2000); # ..1..2..3..4 # "1" will be shown because replay subject has "one" buffer size