响应式编程入门

什么是响应式编程

直接举个 🌰

比如我们有个需求,要统计发送 http 请求的次数,那么意味着我们会有两个模块,一个是 http request 用于发送 http 请求,一个是 counter 用于统计次数。其实我们的需要就是当 http 请求发送时,counter 会执行某个方法令其记录的数据 +1。 最直接的方法就是这样:

class HttpRequest {
  send(){
    // 实际发送 http 请求的实现
    counter.increment();
  }
}

当 send 方法被调用时,直接执行 counter.increment(),这样 counter 内的数据就 +1 了,然后我们的需求就做完了。 当时我们仔细想想,作为一个 http 模块,它要做的事情应该只是发送一个 http 的请求,它为什么要关心计数的问题,计数明明应该是 counter 需要关心的事情。这样写的话,其实 http 模块和 counter 模块其实是耦合在一起的。 那么我们换个思路来想,既然说计数这个事情应该是由 counter 来做,那么 counter.increment() 的行为是不是应该 counter 自己来调用。 怎么实现呢:

class HttpRequest {
  send(){
    // 实际发送 http 请求的实现
    eventBus.emit("requestDone")
  }
}
class Counter {
  constructor(){
    eventBus.on("requestDone").then(() => {
      this.increment();
    })
  }
}

这样的话,counter 的 increment 就可以是一个私有方法,就不会被外部去改变。而 http 模块也不再需要知道 counter 模块,它只需要完成自己的本职工作就可以了(发送 http 请求)。 所以响应式编程的优势是什么?就是我们软件工程一直提倡的:关注度分离

前端的响应式编程

其实大家作为前端开发人员,我相信大家都已经接触过响应式编程了。因为我们现在主流的前端框架都是响应式的。联想一下,在各个前端框架中,我们现在要改变视图,不是用 jquery 命令式地去改变 dom,而是通过 setState(),修改 this.data 或修改 $scope.data...。我们修改的明明只是数据,但是只要数据更新了,我们就不用管了,这些前端框架会自动帮我们把数据渲染成视图。

20221109183505@2x
20221109183505@2x
所以有了这些响应式的框架,我们平时开发的心智模型降低了很多,我们只需要去操作数据就可以了,然而,修改数据这件事听着就很像命令式,尤其是随着产品功能越来越复杂,我们要管理的状态也越来越庞大,如果每个地方都可以随意修改这些状态的话,这些状态就会变得越来越不可控,越来越难以追溯。所以当我们开始使用这些响应式的前端框架开发比较大型的项目时,状态管理尤为重要,如果还是以之前命令式的思想去修改状态,无非是从一个地狱跳入到另一个地狱。 为了解决管理状态这个痛点,好多状态管理的库应运而生,不管是 Redux 还是 Mobx,他们解决的方向都是为了让状态的变化可预测,另外再提供一些撤销/重做,时间旅行等附加功能。当时他们都没有解决状态从产生变化再实际 set state 的这段过程该怎么管理。所以我们还是免不了写一大堆命令式的代码去修改状态。 既然现在主流的框架都是响应式的,我们也慢慢摒弃了 jquery 这种命令式的开发视图的模式,说明响应式对于我们前端开发来说是适合的,那么既然视图的更新我们已经做到了响应式的,是不是数据的更新也可以弄成响应式的?视图的响应式开发让我们避免了操作 DOM,而数据的响应式开发则会让我们避免操作 store。我们只需要关心数据的来源即可,来源的数据产生了变化,store 自然而然会跟着变化。

怎么实现响应式编程

今天我们主要讲最后一个实现方式:数据流

什么是数据流 Stream

大家都知道数组,数组是什么?是内存中的一片空间来存储我们数据的数据结构。所以数组是空间上的序列。 而数据流,则是时间上的数据序列。

20221109183637@2x
20221109183637@2x
有了数据流之后可以做什么呢?还是继续和数组来对比。

map 操作

数组可以通过 map,把一个数据转换成另一个数组:

20221109183745@2x
20221109183745@2x
而数据流也可以通过 map,把一个数据流转换成另一个数据流:
20221109183825@2x
20221109183825@2x

filter 操作

数组还可以通过 filter,生成一个新数组,数据为过滤后的结果:

20221111091214@2x
20221111091214@2x
数据流同样也可以,通过 filter 来生成一个新数据流,数据为过滤后的结果:
20221111091304@2x
20221111091304@2x

时间维度的处理

相比于数组,由于数据流是时间概念的,所以还可以做一些和时间有关的操作 比如我生成一个新的数据流,相比于之前的数据流延迟 2s

20221109184121@2x
20221109184121@2x
或者按照时间顺序,合并两个数据流
20221109184230@2x
20221109184230@2x
RxJS 中内置了许多用于操作数据流的操作符,基本满足我们日常的使用,即使不满足,我们也可以自己写操作符。 https://rxjs.dev/api

数据是如何产生的

我们先看一下 React 是怎么渲染页面的:

ReactDOM.createRoot(document.getElementById("root")).render(
  <App />
);

其实就是这个 render 方法,这个方法只会在加载时执行一次,我们传入一个 <App /><App />是什么?我们都知道,它就是一个函数,函数签名是function App():JSX.Element 这个函数没有参数,但是返回的结果每次都不一样,所以这个函数不是纯函数,说明是有副作用导致了这个函数里面的数据发生了变化,从而令这个函数返回不同的内容,那么这些副作用,就是产生数据的地方,他们是可以枚举的:

  • Event - 浏览器的一系列原生事件
  • XHR - XMLHttpRequest
  • Timers - setTimeout( ) 、setInterval( )

那么我们只要管理好这些数据变化的来源,再保证数据的流转过程,这样整个数据变化又变成了一个纯函数,即只要我们知道了来源的数据变更,就一定能推断出最终的结果。

20221109184617@2x
20221109184617@2x

一些小的应用示例

响应式编程提高了代码的抽象层级,所以你可以只关注定义了业务逻辑的那些相互依赖的事件,而非纠缠于大量的实现细节。RP 的代码往往会更加简明。

示例一

如果我想每点击一次就打印一句“Clicked”,我们一般都会这样写:

document.addEventListener('click', () => console.log('Clicked!'));

很直观很简洁,没有什么问题。 如果用响应式的方式去实现,则会是这样:

import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));

大家可能看着没什么区别,感觉就是 api 的名称变了而已,那我稍微换一下写法:

import { fromEvent } from 'rxjs';

const clickEvent$ = fromEvent(document, 'click');
clickEvent$.subscribe(() => console.log('Clicked!'));

这样大家可能就能看出不一样的地方了,前者是监听了一个事件,所后面所有的事情都只能在事件的回调函数里面处理。而我后者则是订阅的数据流,我所有的业务逻辑,都可以通过改变这个数据流来实现。 比如说,我开始有新需求了,我想统计我点击次数,每次点击打印的内容 + 1。 按照之前命令式的写法,我们会这样写:

let count = 0;
document.addEventListener('click', () => console.log(`Clicked ${++count} times`));

这下好了,我们需要维护状态了,因为我们需要保存一下之前打印的内容,从而在新打印的时候 + 1。 命令式编程就是这样,先定义数据,然后通过指令改变数据。 如何用响应式写法去做呢:

import { fromEvent, scan } from 'rxjs';

fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))
  .subscribe((count) => console.log(`Clicked ${count} times`));

看,依然是一个纯函数,我们不再需要维护一个类似 count 的状态。

继续,我们再添加新的需求:我想限制一秒内只能点击一次 换成命令式的写法该怎么做?我们又需要维护一个状态,来保存上一次点击的时间,然后在点击触发的时候判断两次时间有没有超过一秒,如果未超过,就不打印东西。

let conut = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`你点击了${++conut}次`);
    lastClick = Date.now();
  }
});

现在,我们已经需要维护两个状态了。而随着功能迭代越来越多,不可避免得我们要维护的状态也会越来越多。而命令式编程带来的问题就是每条命令都是离散的,我们如果想了解一段代码的逻辑,就只能跟着代码运行的过程一条一条去看。代码多了,一个代码块里就可能会参杂好几处逻辑,我们代码的可读性就会变得越来越差。

import { of, map, throttleTime, fromEvent, scan, count } from 'rxjs';

const clickEvent$ = fromEvent(document, 'click');

clickEvent$
  .pipe(
    throttleTime(1000),
    scan((count) => count + 1, 0)
  )
  .subscribe((count) => console.log(`你点击了${count}次`));

而反观响应式的编程方式,基本上就是靠纯函数的组合来实现业务逻辑,读代码时也不需要安装代码执行的步骤一条一条去看,因为每个函数都是自描述的,就像是用一些单词,拼成了一条完整的句子。

示例二

实现一个简单的拖拽功能

const box = document.getElementById('box');
// 获取鼠标点击时在 div 中的相对位置
box.onmousedown = (ev) => {
  const { x, y } = getTranslate(box);
  const relaX = ev.clientX - x;
  const relaY = ev.clientY - y;

  // 获取当前鼠标位置,减去与 div 的相对位置得到当前 div 应该被拖拽的位置
  document.onmousemove = (ev) => {
    setTranslate(box, { x: ev.clientX - relaX, y: ev.clientY - relaY });
  };
  document.onmouseup = (ev) => {
    document.onmousemove = null;
    document.onmouseup = null;
  };
};

https://stackblitz.com/edit/typescript-8pa41d?file=index.ts

如果用数据流编程的思路,改怎么做。 首先分析一下,为了相应地移动小方块,我们需要知道的信息有:

  1. 小方块被拖拽时的初始位置
  2. 小方块在被拖拽着移动时,需要移动到的新位置

而怎么理解拖拽呢?我们可以用弹珠图来直观得表示:

20221109185017@2x
20221109185017@2x
这样我们就可以很直观的看出,drag 的数据流,就是取鼠标按下和抬起之间的 mousemove 数据流就可以了,这么我们只需要按照我们的思路操作数据流即可,而 RxJS 内置的操作符则方便了我们的操作。 http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-switchMap http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-takeUntil

const box = document.getElementById('box');
const mouseDown$ = fromEvent(box, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

const drag$ = mouseDown$.pipe(
  map((event: MouseEvent) => {
    return {
      pos: getTranslate(box),
      event,
    };
  }),
  switchMap((initialState) => {
    const initialPos = initialState.pos;
    const { clientX, clientY } = initialState.event;
    return mouseMove$.pipe(
      map((moveEvent: MouseEvent) => {
        return {
          x: moveEvent.clientX - clientX + initialPos.x,
          y: moveEvent.clientY - clientY + initialPos.y,
        };
      }),
      takeUntil(mouseUp$)
    );
  })
);

drag$.subscribe((pos) => {
  setTranslate(box, pos);
});

https://stackblitz.com/edit/rxjs-6jxpu7?devtoolsheight=60&file=index.ts

添加初始延迟

需求:在拖拽的实际应用中,有时会希望有个初始延迟。

import { delay, fromEvent, takeUntil, map, switchMap, of } from 'rxjs';

const box = document.getElementById('box');
const mouseDown$ = fromEvent(box, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

const drag$ = mouseDown$.pipe(
  switchMap((event: MouseEvent) => {
    return of({
      pos: getTranslate(box),
      event,
    }).pipe(delay(200), takeUntil(mouseMove$));
  }),
  switchMap((initialState) => {
    const initialPos = initialState.pos;
    const { clientX, clientY } = initialState.event;
    return mouseMove$.pipe(
      map((moveEvent: MouseEvent) => {
        return {
          x: moveEvent.clientX - clientX + initialPos.x,
          y: moveEvent.clientY - clientY + initialPos.y,
        };
      }),
      takeUntil(mouseUp$)
    );
  })
);

drag$.subscribe((pos) => {
  setTranslate(box, pos);
});

拖拽接龙

实现拖动一个方块时,其他方块会在一定的延迟后跟着拖动的方块一起动。

20221109185425@2x
20221109185425@2x

import {
  fromEvent,
  map,
  interval,
  switchMap,
  takeUntil,
  mergeMap,
  tap,
  take,
} from 'rxjs';

const headBox = document.getElementById('head');
const boxes = document.getElementsByClassName('box');
const mouseDown$ = fromEvent(headBox, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');
const delayBoxes$ = interval(100).pipe(
  take(boxes.length),
  map((n) => boxes[n])
);

const drag$ = mouseDown$.pipe(
  map((e: MouseEvent) => {
    const pos = getTranslate(headBox);
    return {
      pos,
      event: e,
    };
  }),
  switchMap((initialState) => {
    const initialPos = initialState.pos;
    const { clientX, clientY } = initialState.event;
    return mouseMove$.pipe(
      map((moveEvent: MouseEvent) => ({
        x: moveEvent.clientX - clientX + initialPos.x,
        y: moveEvent.clientY - clientY + initialPos.y,
      })),
      takeUntil(mouseUp$)
    );
  })
);

drag$
  .pipe(
    mergeMap((pos) => {
      return delayBoxes$.pipe(
        tap((box) => {
          setTranslate(box, pos);
        })
      );
    })
  )
  .subscribe();

https://stackblitz.com/edit/rxjs-t2xwfi?devtoolsheight=60&file=index.ts

总结

命令式编程虽然建模很容易,但是只是针对你已知情况的复刻,它要求你必须知道事情的全部的原因和结果,对一个问题的中间变化,各种情况都要了如指掌。各种各样不同的情境也都需要考虑到,只有这样才能把现实问题在计算机中复刻出来。因为一个运算一个操作,它就是在你已经知道了真是的前因后果后定义出来的,你无法定义你不知道结果的操作。 而函数响应式编程它复刻的就不是某个具体的问题了,而是这个问题背后的逻辑和规律。然后根据这个逻辑和规律去重建整个系统。 有兴趣的话大家可以了解下图灵机和λ演算法。命令式编程的思想就是来源于图灵机,函数编程的思想就是来源于λ演算法。二者是等价的,都是图灵完备,只不过是解决同一个问题的不同思路而已。