# Reactive Programming
# 简介
响应式编程:是一种面向数据流和变化传播的编程范式。
为了应对高并发服务器端开发场景,由微软在 2009 年提出。
# Click a Button
让我们从最简单的“点击按钮”开始理解,点击两次按钮将会产生这样的事件流。
-----v-----v----e----c->
整体是一个时间轴,v 是点击按钮产生的事件数据;e 是异常;c 是完成信号,表示你不再订阅这个事件流。
我们把这个事件流称为 clickStream,接下来,让我们尝试创建一个统计点击次数的事件流。
clickStream
-----v-----v----e----c->
clickNumStream = clickStream.map(v => 1)
-----1-----1----e----c->
counterStream = clickNumStream.scan((res, cur) => { res += cur }, 0)
-----1-----2----e----c->
这样一来,通过两次 Stream 的转换,我们就得到了一个统计点击次数的流。
下面再来看复杂一点的检测“双击”事件的流,它先把 250ms 内的点击事件累积到一个列表中,然后过滤掉长度为 1 的列表。
以上只是对 Reactive Programming 的简单描述。在使用广泛的 Rx 系列库中,我们一般把这里所说的流称作 Observable,或者说把流作为 Observable 的表现。关于它的由来,让我们接着往下看。
# 深入原理
现在让我们深入了解一下 Observable 如何诞生的。
# Getter & Setter
首先我们定义 Getter 和 Setter:
function get() {
return 1;
}
function set(v) {
console.log(v);
}
很容易理解,Getter 是生产者,他生产出 1 并返回;Setter 是消费者,他消费了 v 并把它打印出来。
# Getter-Getter & Setter-Setter
下面我们定义 Getter-Getter 和 Setter-Setter:
function getGetter() {
let i = 1;
return function get() {
const tmp = i;
i++;
return tmp;
}
}
let next = getGetter();
next();
function set(v) {
console.log(v);
}
function setSetter(callback) {
callback(1);
}
setSetter(set);
很熟悉吧!Getter-Getter 就是闭包,而 Setter-Setter 就是回调!
从生产消费的视角来看,Getter-Getter 中的 get 是消费者,消费了 getGetter 中维护的 i;而 Setter-Setter 中的 set 是消费者,消费了 setSetter 中传入的 1。
从调用关系来看,Getter-Getter 中是调用消费者导致生产,为 pull 模型;而 Setter-Setter 中是调用生产者触发消费,为 push 模型。
# Iterable
这时,我们又发现这个 getGetter 长得有点像一个迭代器!没错,它已经拥有了可重入和有状态两个性质了,再给它加上序列结束,并实现 Symbol.iterator 协议,它就会成为一个 Iterable:
const getGetter = {
[Symbol.iterator]: () => {
let i = 1;
return {
next: () => {
if (i <= 5) {
const tmp = i;
i++;
return { done: false, value: tmp };
} else {
return { done: true };
}
}
}
}
}
当然,这样写起来还是挺麻烦的,所以 JS 为我们提供了 Generator 语法糖:
function* getGetter() {
let i = 1;
while (true) {
if (i <= 5) {
yield i;
i++;
} else {
return;
}
}
}
# Promise
看到 Getter-Getter 变得这么好用,Setter-Setter 馋哭了。然而,Setter-Setter 本身存在很大的问题:它可能是同步、也可能是异步;它可能分发一个或多个值。
这个时候不得不提到老熟人 Promise,它其实就是一种特殊的 Setter-Setter,它为 S-S 加了三个限制:set 只会被异步调用;set 只会被调用一次;允许传入第二个 set 来处理错误。
function setSetter(set) {
set(10)
set(10)
}
console.log('before');
setSetter(console.log);
console.log('after');
// before
// 10
// 10
// after
const promise = new Promise(function setSetter(set) {
set(10);
set(10);
});
console.log('before');
promise.then(console.log);
console.log('after');
// before
// after
// 10
async/await 实现了将 Promise 转化为同步的形式,但依然只调用 set 一次。
const promise = new Promise(function setSetter(set) {
set(10);
set(10);
});
console.log('before');
console.log(await promise);
console.log('after');
// before
// 10
// after
# Observable
Promise 有多重限制,依然不够强大,我们要怎样才能让 Setter-Setter 拥有自己的 Iterable 呢?此时,Observable 作为 Itrable 的复制出现了:Itrable 是一个对象,Observable 也是一个对象;Iterable 有 iterate 方法(Symbol.iterator),Observable 有 observe 方法(subscribe);iterate 方法是一个 Iterator 对象的 Getter,observe 方法是一个 Observer 对象的 Setter;Iterator 对象有 next 方法作为 Getter,Observer 对象有 next 方法作为 Setter。
除此之外,Observer 对象还允许两个函数,分别处理 complete 和 error。
const observable = {
subscribe: (observer) => {
let i = 1;
let clock = setInterval(() => {
if (i <= 5) {
observer.next(i);
i++;
} else {
observer.complete();
clearInterval(clock);
}
}, 1000);
return function unsubscribe() {
clearInterval(clock);
}
}
};
const subscription = observable.subscribe({
next: i => console.log(i),
complete: () => console.log('done'),
error: (err) => console.error(err)
});
setTimeout(() => {
subscription.unsubscribe();
}, 2500)
以上就是一个经典的 Observable 实现,要注意的是,为了实现 unsubscribe 的功能,这里的 subscribe() 需要返回一个函数,所以 observe 方法已经不是一个纯的 Setter 了。
其实,Observable 就是观察者模式和迭代器模式结合的产物:在事件流中,通过调用 next 来处理事件;Observer 通过 Observable 提供的 subscribe 方法订阅**(把 subscribe 称为“观察”就好理解了)**,Observable 通过回调 next 方法发布。
在 Rx 中,对 Observable 加了一些保护原则:调用 unsubscribe 之后,任何方法都不能再被调用;在触发 complete 和 error 后,会自动调用 unsubscribe;出现异常时,也会自动调用 unsubscribe。
# Operator
现在我们已经有了 Observable 的实现,可以来看看 map 的实现:
function map(source, fn) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(fn(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}
其实 map 只是 Operator 的一种:Operator 都会接收一个 Observable 对象 source,再返回一个 Observable 对象 destination;当我们用 observer 订阅 destination 时,destination 内部也会自动订阅 source。
通过 Operator,我们就可以构建一个 Observable 的链,在链末尾订阅就能让数据在产生后通过整个链被多次处理。
看到这里我们可以发现,Observable 本质上是一个函数,把 data 加工成 newData,而函数是没有同步异步之分的(aysnc 函数只是语法糖,本质上是 Promise 有异步而不是函数有异步),只有数据有。所以数据流可以像 [Click a Button](##Click a Button) 中那样写成一个时间轴,这些数据经过一个 Observable 的加工,会变成另一个数据流,因此,我们才说流是 Observable 的表现。这就是 Rx 解决异步问题
# Create
现在我们离统计按钮点击次数就差一步了,怎样把对按钮的点击转化为事件流呢?其实也不用理解,这些 Rx 都帮我们准备好了。
Rx 提供了大量的 Create 方法,比如 fromEvent 可以把对按钮的点击转化为事件流,不论是 DOM 事件或是定时任务,只要被转化成数据流,什么事件、同步、异步就都不用管了,剩下的就都在 Rx 的函数式编程的掌控之中了!
# 参考
从类型理解rxjs与async generator (一) (opens new window)