# Reactive Programming

# 简介

响应式编程:是一种面向数据流和变化传播的编程范式。

为了应对高并发服务器端开发场景,由微软在 2009 年提出。

# Click a Button

让我们从最简单的“点击按钮”开始理解,点击两次按钮将会产生这样的事件流。

-----v-----v----e----c->

整体是一个时间轴,v 是点击按钮产生的事件数据;e 是异常;c 是完成信号,表示你不再订阅这个事件流。

我们把这个事件流称为 clickStream,接下来,让我们尝试创建一个统计点击次数的事件流。

clickStream
-----v-----v----e----c->
clickNumStream = clickStream.map(v => 1)
-----1-----1----e----c->
counterStream = clickNumStream.scan((res, cur) => { res += cur }, 0)
-----1-----2----e----c->

这样一来,通过两次 Stream 的转换,我们就得到了一个统计点击次数的流。

下面再来看复杂一点的检测“双击”事件的流,它先把 250ms 内的点击事件累积到一个列表中,然后过滤掉长度为 1 的列表。

以上只是对 Reactive Programming 的简单描述。在使用广泛的 Rx 系列库中,我们一般把这里所说的流称作 Observable,或者说把流作为 Observable 的表现。关于它的由来,让我们接着往下看。

# 深入原理

现在让我们深入了解一下 Observable 如何诞生的。

# Getter & Setter

首先我们定义 Getter 和 Setter:

function get() {
    return 1;
}
function set(v) {
    console.log(v);
}

很容易理解,Getter 是生产者,他生产出 1 并返回;Setter 是消费者,他消费了 v 并把它打印出来。

# Getter-Getter & Setter-Setter

下面我们定义 Getter-Getter 和 Setter-Setter:

function getGetter() {
    let i = 1;
    return function get() {
        const tmp = i;
        i++;
        return tmp;
    }
}
let next = getGetter();
next();
function set(v) {
    console.log(v);
}
function setSetter(callback) {
    callback(1);
}
setSetter(set);

很熟悉吧!Getter-Getter 就是闭包,而 Setter-Setter 就是回调!

从生产消费的视角来看,Getter-Getter 中的 get 是消费者,消费了 getGetter 中维护的 i;而 Setter-Setter 中的 set 是消费者,消费了 setSetter 中传入的 1。

从调用关系来看,Getter-Getter 中是调用消费者导致生产,为 pull 模型;而 Setter-Setter 中是调用生产者触发消费,为 push 模型。

# Iterable

这时,我们又发现这个 getGetter 长得有点像一个迭代器!没错,它已经拥有了可重入和有状态两个性质了,再给它加上序列结束,并实现 Symbol.iterator 协议,它就会成为一个 Iterable:

const getGetter = {
  [Symbol.iterator]: () => {
    let i = 1;
    return {
      next: () => {
        if (i <= 5) {
          const tmp = i;
          i++;
          return { done: false, value: tmp };
        } else {
          return { done: true };
        }
      }
    }
  }
}

当然,这样写起来还是挺麻烦的,所以 JS 为我们提供了 Generator 语法糖:

function* getGetter() {
  let i = 1;
  while (true) {
    if (i <= 5) {
      yield i;
      i++;
    } else {
      return;
    }
  }
}

# Promise

看到 Getter-Getter 变得这么好用,Setter-Setter 馋哭了。然而,Setter-Setter 本身存在很大的问题:它可能是同步、也可能是异步;它可能分发一个或多个值

这个时候不得不提到老熟人 Promise,它其实就是一种特殊的 Setter-Setter,它为 S-S 加了三个限制:set 只会被异步调用;set 只会被调用一次;允许传入第二个 set 来处理错误。

function setSetter(set) {
  set(10)
  set(10)
}
console.log('before');
setSetter(console.log);
console.log('after');

// before
// 10
// 10
// after

const promise = new Promise(function setSetter(set) {
  set(10);
  set(10);
});
console.log('before');
promise.then(console.log);
console.log('after');

// before
// after
// 10

async/await 实现了将 Promise 转化为同步的形式,但依然只调用 set 一次。

const promise = new Promise(function setSetter(set) {
  set(10);
  set(10);
});
console.log('before');
console.log(await promise);
console.log('after');

// before
// 10
// after

# Observable

Promise 有多重限制,依然不够强大,我们要怎样才能让 Setter-Setter 拥有自己的 Iterable 呢?此时,Observable 作为 Itrable 的复制出现了:Itrable 是一个对象,Observable 也是一个对象;Iterable 有 iterate 方法(Symbol.iterator),Observable 有 observe 方法(subscribe);iterate 方法是一个 Iterator 对象的 Getter,observe 方法是一个 Observer 对象的 Setter;Iterator 对象有 next 方法作为 Getter,Observer 对象有 next 方法作为 Setter。

除此之外,Observer 对象还允许两个函数,分别处理 complete 和 error。

const observable = {
  subscribe: (observer) => {
    let i = 1;
    let clock = setInterval(() => {
      if (i <= 5) {
        observer.next(i);
        i++;
      } else {
        observer.complete();
        clearInterval(clock);
      }
    }, 1000);
    return function unsubscribe() {
      clearInterval(clock);
    }
  }
};

const subscription = observable.subscribe({
  next: i => console.log(i),
  complete: () => console.log('done'),
  error: (err) => console.error(err)
});
setTimeout(() => {
  subscription.unsubscribe();
}, 2500)

以上就是一个经典的 Observable 实现,要注意的是,为了实现 unsubscribe 的功能,这里的 subscribe() 需要返回一个函数,所以 observe 方法已经不是一个纯的 Setter 了。

其实,Observable 就是观察者模式和迭代器模式结合的产物:在事件流中,通过调用 next 来处理事件;Observer 通过 Observable 提供的 subscribe 方法订阅**(把 subscribe 称为“观察”就好理解了)**,Observable 通过回调 next 方法发布。

在 Rx 中,对 Observable 加了一些保护原则:调用 unsubscribe 之后,任何方法都不能再被调用;在触发 complete 和 error 后,会自动调用 unsubscribe;出现异常时,也会自动调用 unsubscribe。

# Operator

现在我们已经有了 Observable 的实现,可以来看看 map 的实现:

function map(source, fn) {
  return new Observable((observer) => {
    const mapObserver = {
      next: (x) => observer.next(fn(x)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}

其实 map 只是 Operator 的一种:Operator 都会接收一个 Observable 对象 source,再返回一个 Observable 对象 destination;当我们用 observer 订阅 destination 时,destination 内部也会自动订阅 source。

通过 Operator,我们就可以构建一个 Observable 的链,在链末尾订阅就能让数据在产生后通过整个链被多次处理。

看到这里我们可以发现,Observable 本质上是一个函数,把 data 加工成 newData,而函数是没有同步异步之分的(aysnc 函数只是语法糖,本质上是 Promise 有异步而不是函数有异步),只有数据有。所以数据流可以像 [Click a Button](##Click a Button) 中那样写成一个时间轴,这些数据经过一个 Observable 的加工,会变成另一个数据流,因此,我们才说流是 Observable 的表现。这就是 Rx 解决异步问题

# Create

现在我们离统计按钮点击次数就差一步了,怎样把对按钮的点击转化为事件流呢?其实也不用理解,这些 Rx 都帮我们准备好了。

Rx 提供了大量的 Create 方法,比如 fromEvent 可以把对按钮的点击转化为事件流,不论是 DOM 事件或是定时任务,只要被转化成数据流,什么事件、同步、异步就都不用管了,剩下的就都在 Rx 的函数式编程的掌控之中了!

# 参考

RxJS光速入门 (opens new window)

从类型理解rxjs与async generator (一) (opens new window)

JAVASCRIPT GETTER-SETTER PYRAMID (opens new window)

Observable详解 (opens new window)

Last Updated: 10/26/2021, 11:45:49 AM