-
Notifications
You must be signed in to change notification settings - Fork 0
/
observable.js
104 lines (104 loc) · 3.18 KB
/
observable.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"use strict";
class SafeObserver {
constructor(destination) {
this.isUnsubscribed = false;
this.destination = destination;
if (destination.unsub) {
this.unsub = destination.unsub;
}
}
next(value) {
if (!this.isUnsubscribed) {
this.destination.next(value);
}
}
complete() {
if (!this.isUnsubscribed) {
this.destination.complete();
this.unsubscribe();
}
}
unsubscribe() {
if (!this.isUnsubscribed) {
this.isUnsubscribed = true;
if (this.unsub)
this.unsub();
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(next, complete) {
const safeObserver = new SafeObserver({
next: next,
complete: complete ? complete : () => console.log('complete')
});
safeObserver.unsub = this._subscribe(safeObserver);
return safeObserver.unsubscribe.bind(safeObserver);
}
static fromEvent(el, name) {
return new Observable((observer) => {
const listener = ((e) => observer.next(e));
el.addEventListener(name, listener);
return () => el.removeEventListener(name, listener);
});
}
static fromArray(arr) {
return new Observable((observer) => {
arr.forEach(el => observer.next(el));
observer.complete();
return () => { };
});
}
static interval(milliseconds) {
return new Observable(observer => {
let elapsed = 0;
const handle = setInterval(() => observer.next(elapsed += milliseconds), milliseconds);
return () => clearInterval(handle);
});
}
map(transform) {
return new Observable(observer => this.subscribe(e => observer.next(transform(e)), () => observer.complete()));
}
forEach(f) {
return new Observable(observer => this.subscribe(e => {
f(e);
return observer.next(e);
}, () => observer.complete()));
}
filter(condition) {
return new Observable(observer => this.subscribe(e => {
if (condition(e))
observer.next(e);
}, () => observer.complete()));
}
takeUntil(o) {
return new Observable(observer => {
const oUnsub = o.subscribe(_ => {
observer.complete();
oUnsub();
});
return this.subscribe(e => observer.next(e), () => {
observer.complete();
oUnsub();
});
});
}
flatMap(streamCreator) {
return new Observable((observer) => {
return this.subscribe(t => streamCreator(t).subscribe(o => observer.next(o)), () => observer.complete());
});
}
scan(initialVal, fun) {
return new Observable((observer) => {
let accumulator = initialVal;
return this.subscribe(v => {
accumulator = fun(accumulator, v);
observer.next(accumulator);
}, () => observer.complete());
});
}
}
//# sourceMappingURL=observable.js.map