This commit is contained in:
99
node_modules/zen-observable/src/extras.js
generated
vendored
Normal file
99
node_modules/zen-observable/src/extras.js
generated
vendored
Normal file
@@ -0,0 +1,99 @@
|
||||
import { Observable } from './Observable.js';
|
||||
|
||||
// Emits all values from all inputs in parallel
|
||||
export function merge(...sources) {
|
||||
return new Observable(observer => {
|
||||
if (sources.length === 0)
|
||||
return Observable.from([]);
|
||||
|
||||
let count = sources.length;
|
||||
|
||||
let subscriptions = sources.map(source => Observable.from(source).subscribe({
|
||||
next(v) {
|
||||
observer.next(v);
|
||||
},
|
||||
error(e) {
|
||||
observer.error(e);
|
||||
},
|
||||
complete() {
|
||||
if (--count === 0)
|
||||
observer.complete();
|
||||
},
|
||||
}));
|
||||
|
||||
return () => subscriptions.forEach(s => s.unsubscribe());
|
||||
});
|
||||
}
|
||||
|
||||
// Emits arrays containing the most current values from each input
|
||||
export function combineLatest(...sources) {
|
||||
return new Observable(observer => {
|
||||
if (sources.length === 0)
|
||||
return Observable.from([]);
|
||||
|
||||
let count = sources.length;
|
||||
let seen = new Set();
|
||||
let seenAll = false;
|
||||
let values = sources.map(() => undefined);
|
||||
|
||||
let subscriptions = sources.map((source, index) => Observable.from(source).subscribe({
|
||||
next(v) {
|
||||
values[index] = v;
|
||||
|
||||
if (!seenAll) {
|
||||
seen.add(index);
|
||||
if (seen.size !== sources.length)
|
||||
return;
|
||||
|
||||
seen = null;
|
||||
seenAll = true;
|
||||
}
|
||||
|
||||
observer.next(Array.from(values));
|
||||
},
|
||||
error(e) {
|
||||
observer.error(e);
|
||||
},
|
||||
complete() {
|
||||
if (--count === 0)
|
||||
observer.complete();
|
||||
},
|
||||
}));
|
||||
|
||||
return () => subscriptions.forEach(s => s.unsubscribe());
|
||||
});
|
||||
}
|
||||
|
||||
// Emits arrays containing the matching index values from each input
|
||||
export function zip(...sources) {
|
||||
return new Observable(observer => {
|
||||
if (sources.length === 0)
|
||||
return Observable.from([]);
|
||||
|
||||
let queues = sources.map(() => []);
|
||||
|
||||
function done() {
|
||||
return queues.some((q, i) => q.length === 0 && subscriptions[i].closed);
|
||||
}
|
||||
|
||||
let subscriptions = sources.map((source, index) => Observable.from(source).subscribe({
|
||||
next(v) {
|
||||
queues[index].push(v);
|
||||
if (queues.every(q => q.length > 0)) {
|
||||
observer.next(queues.map(q => q.shift()));
|
||||
if (done())
|
||||
observer.complete();
|
||||
}
|
||||
},
|
||||
error(e) {
|
||||
observer.error(e);
|
||||
},
|
||||
complete() {
|
||||
if (done())
|
||||
observer.complete();
|
||||
},
|
||||
}));
|
||||
|
||||
return () => subscriptions.forEach(s => s.unsubscribe());
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user