RxJS 缓存高级教程

原文链接:https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html

在开发 Web 应用程序时,性能一般都是出于最高优先级的。对于 Angular 项目,我们有很多途径去提升程序性能,例如摇树优化(tree-shaking)、AoT(ahead-of-time 编译)、模块懒加载(lazy loading)以及缓存。为了能够更好地概览全局,以便提高 Angular 应用程序的性能,我们强烈推荐使用 Minko GechevAngular 性能检查表。本文主要聚焦于缓存

事实上,缓存是提升网站性能的最有效的方法之一,尤其是当用户出于受限网络带宽或慢速网络的情况。

有很多种缓存数据或资源的方法。静态资源通常使用标准的浏览器缓存或 Service Worker 进行缓存。当然,Service Worker 也可以缓存 API 请求,但它们更适合缓存图片、HTML、JS 或 CSS 等文件。缓存系统数据,我们则会选用另外的机制。

不管我们选择怎样的机制,缓存都会改善系统的响应性降低网络消耗在网络中断的情况下依然能够使用内容。换句话说,当内容在更接近用户的地方被缓存,例如就在客户端,请求就不会引起另外的网络活动;缓存的数据可以被更快返回,因为我们不需要进行完整的网络周期。

这篇文章我们将使用 RxJS 以及 Angular 提供的各种工具实现一种高级的缓存机制。

动机

一直以来,都有一个疑问:如何在一个频繁使用Observable对象的 Angular 程序中缓存数据。很多人都知道如何使用Promise缓存数据,但是对如何在函数式的响应式编程中缓存数据束手无策。因为后者的复杂性(庞大的 API)、完全不同的使用方式(从命令式编程到指令式编程)以及许多概念。因此,将基于Promise的缓存系统移植到Observable是非常困难的,尤其是还需要实现一些高级功能的时候。
Angular 应用程序通常使用由HttpClientModule提供的HTTPClient实现 HTTP 请求。它的所有 API 都是基于Observable的。这意味着,HTTPClient的函数,例如getpostputdelete都返回一个ObservableObservable天生是懒的,只有当我们调用了subscribe函数之后才会发送请求。但是,对同一个Observable多次调用subscribe函数,会一遍一遍地创建源Observable对象,也就是为每一次定于执行一次请求。我们将这种模式成为冷模式(cold)。
如果你对此完全不了解,可以阅读我们的另外一篇文章《Observable 的冷模式和热模式》

这种行为使得实现Observable缓存机制变得有些棘手。简单的实现通常需要大量固定模式的代码,而且最终可能需要绕过 RxJS。这是一种解决思路,但如果我们依然希望使用Observable的强大功能,这种实现就不值得推荐。简单来说,我们并不想给法拉利安装一个摩托车引擎,对吧?

需求

在我们深入代码之前,首先定义好我们的高级缓存机制的需求。

我们要开发一个名为笑话世界的应用。这是一个简单的 app,随机显示给定分类里面的笑话。为了尽可能简单,我们只给出一个分类。

这个 app 有三个组件:AppComponentDashboardComponentJokeListComponent

AppComponent是程序入口,显示一个工具栏和一个安装当前路由状态填充的<router-outlet>

DashboardComponent只用来显示分类列表。我们可以从这里导航到JokeListComponentJokeListComponent负责将笑话列表显示到屏幕。

笑话由 Angular 的HttpClient服务从服务器获取。为保持组件的响应、解耦,我们要创建一个JokeService,来帮助我们获取数据。组件只需要注入这个服务,通过其公开的 API 访问数据即可。
以上所有都是我们的系统架构,并没有引入缓存。

当我们从主页到列表视图时,我们可以从缓存请求数据,而不是每次都从服务器获取。缓存中的数据每 10 秒自动更新。

当然,每 10 秒获取数据并不是每个产品都需要遵守的固定准则,我们可能需要更复杂的实现来更新缓存(例如使用 web socket 推送更新)。但是,现在我们可以尽可能保持简单,集中精力解决缓存的问题。

无论如何,我们都希望收到某种更新的通知。就我们的程序而言,我们不希望自动更新 UI(JokeListComponent)的数据,而是用户要求 UI 更新时才去更新缓存。为什么?想象下这样的场景:用户正在阅读一个笑话,因为数据的自动更新,所有笑话突然都消失了。这无疑非常令人反感,是一种非常差的用户体验。因此,我们的用户在有新数据的时候会收到通知。

为了更有趣一点,我们还希望用户能够强制刷新缓存。这与仅仅更新 UI 不同,因为强制刷新意味着要从服务器请求数据、更新缓存、然后更新 UI。

现在我们总结一下我们想要干什么:

  • 程序有两个组件,当从组件 A 导航到组件 B 时,组件 B 的数据最好从缓存获取,而不是每次都从服务器获取
  • 每 10 秒更新缓存
  • UI 中的数据并不会自动更新,而是由用户强制更新
  • 用户可以强制刷新,从服务器重新获取数据、更新缓存和 UI

下面是我们即将构建的 app 预览图:

实现基本的缓存

我们从一个简单的实现开始,逐渐过渡到最终的全功能版本。

第一步是创建一个新的服务。

然后,我们添加两个接口,一个用来描述Joke的属性,另一个用来描述 HTTP 请求的返回。这样的接口会让我们的程序更符合 TypeScript 的要求,同时也更方便开发。

export interface Joke {
  id: number;
  joke: string;
  categories: Array<string>;
}

export interface JokeResponse {
  type: string;
  value: Array<Joke>;
}

下面我们实现JokeService。我们不想透露数据究竟是从缓存获取的,还是从服务器获取的,因此,我们只提供一个返回值类型为Observablejokes属性,用于获取笑话列表。

为了执行 HTTP 请求,我们需要为我们的服务注入HttpClient

下面是JokeService的代码框架:

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class JokeService {

  constructor(private http: HttpClient) { }

  get jokes() {
    ...
  }
}

下面,我们实现一个私有的requestJokes()函数,通过HttpClient的 GET 请求获取笑话列表。

import { map } from 'rxjs/operators';

@Injectable()
export class JokeService {

  constructor(private http: HttpClient) { }

  get jokes() {
    ...
  }

  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

现在,我们有了实现获取笑话的函数的一切准备。

一个显而易见的实现是,直接返回this.requestJokes(),但这无法满足我们的需要。我们知道,所有HttpClient暴露的函数,例如get(),都是返回一个冷Observable。这意味着每个订阅者都会重新出发完整的数据流,从而带来额外的 HTTP 请求。毕竟,缓存的意义就在于提高系统的加载时间,将网络请求限制到最低的水平。

所以,我们想让我们的流变成热的。不仅仅如此,每一个新的订阅者应该获取到最近的缓存值。事实上,有一个很方便的操作符可以实现这一点:shareReplay。这个操作符返回一个Observable对象。该对象会在底层共享一个订阅,也就是this.requestJokes()返回的那个Observable

另外,shareReplay接受一个可选参数bufferSize,对于我们的用例非常有用。bufferSize决定了重现缓存(replay buffer)的最大元素数,也就是被缓存、能够重现给每一个订阅者的元素数。在我们的场景中,我们只需要最近的一个值,因此将bufferSize设置为 1。

我们看一下实际代码,看看我们刚刚学到了什么:

import { Observable } from 'rxjs/Observable';
import { shareReplay, map } from 'rxjs/operators';

const API_ENDPOINT = 'https://api.icndb.com/jokes/random/5?limitTo=[nerdy]';
const CACHE_SIZE = 1;

@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;

  constructor(private http: HttpClient) { }

  get jokes() {
    if (!this.cache$) {
      this.cache$ = this.requestJokes().pipe(
        shareReplay(CACHE_SIZE)
      );
    }

    return this.cache$;
  }

  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

好了,我们已经讨论过上面的大部分代码。但等等,私有的cache$属性以及访问函数里面的if语句是什么意思?答案很简单。如果我们直接返回this.requestJokes().pipe(shareReplay(CACHE_SIZE)),那么,每一个订阅者都会创建一个新的缓存实例。但是,我们想要所有订阅者共享一个实例。因此,我们将这个实例保持在私有的cache$属性中,在第一次调用的时候初始化这个属性。这样,所有订阅者都会访问到这一个共享实例,而不是每次创建一个新的对象。

我们看一下上面代码实现的更直观的表示:

缓存时序图

上图是一个时序图,它描述了场景中涉及的对象,请求一个笑话列表,以及对象之间交换信息的时序。现在我们暂停一下,了解下发生了什么。

我们从导航到列表组件的仪表盘开始。

组件初始化之后,Angular 会调用ngOnInit生命周期钩子。在这里,我们调用JokeService暴露的访问器jokes请求笑话列表。由于这是我们第一次请求数据,所以缓存是空的,并且没有初始化,这意味着JokeService.cache$undefined。我们在访问器内部调用了requestJokes()。这会返回我们一个Observable对象,其数据来自服务器。同时,我们使用shareReplay运算符来获得所期望的行为。

shareReplay操作符会在原始源于未来所有的订阅者之间自动创建一个ReplaySubject。只要订阅者数量从零变为一,它就会将这个Subject关联到底层数据源,然后广播其所有值。未来所有订阅者都会关联到这个Subject,所以实际上只有一个订阅关联到底层的那个冷Observable。这被称为多播(multicasting),定义了我们的简单缓存的基础。

一旦数据从服务器获取到,就会被缓存。

注意,在时序图中,Cache是一个独立的对象,目的是用来说明那个从消费者(订阅者)到底层源(HTTP 请求)之间创建的ReplaySubject

下一次我们为列表中组件请求数据的时候,我们的缓存就会发送最近的值给消费者。这时候并不会有额外的 HTTP 调用。

很简单,对吧?

为了真正的区别开来,我们更进一步,从Observable的层次看看缓存是如何工作的。现在我们使用弹子图(marble diagram)看看流:
cache_share_replay

弹子图非常清晰地显示出,底层Observable只有一个订阅,所有消费者都订阅到这个共享的Observable,也就是这个ReplaySubject对象。我们也能够看出,只有第一个订阅者触发了 HTTP 调用,其余的都是直接获取重现的值。

最后,我们看一下JokeListComponent是如何显示数据的。首先,注入JokeService对象。之后,在ngOnInit中,使用服务对象暴露的访问器初始化一个jokes$属性。这个访问器会返回一个Array<Joke>类型的Observable对象,这正是我们所需要的。

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;

  constructor(private jokeService: JokeService) { }

  ngOnInit() {
    this.jokes$ = this.jokeService.jokes;
  }

  ...
}

注意,我们并没有马上订阅jokes$,而是在模板中使用了async管道,因为这个管道充满了奇迹。好奇吗?请阅读文章《了解关于 AsyncPipe 你不知道的三件事情》

<mat-card *ngFor="let joke of jokes$ | async">...</mat-card>

太棒了!这就是我们实际使用的简单缓存。为了验证是不是只有一次请求,打开 Chrome 的 DevTools,点击 Network 选项卡,选择 XHR。开启仪表盘,导航到列表视图,然后再导航回来。

自动更新

现在我们用几行代码构建了一个简单的缓存机制。事实上,很多工作都是由shareReplay操作符完成的。这个操作符会实现缓存和重现大多数数据值。

当数据不会在后台更新时,这就已经很好地工作了。那么,如果数据每隔几分钟就会改变了呢?我们当然不应该强制用户为了从服务器获取最新数据必须要刷新这个页面。

如果我们的缓存每隔 10 秒钟就会在后台更新呢?是不是很酷?肯定的!作为用户,我们不需要重新加载页面;数据改变后,UI 会随之更新。再说一遍,在真实的应用中,我们一般不会主动拉取数据,而是要服务器推送通知。对于我们的小示例程序而言,能够每隔 10 秒刷新一次就很好了。

这种实现很简单。简而言之,我们需要创建一个Observable,根据给定的时间间隔发出一系列值。或者简单来说,就是我们需要每 X 毫秒产生一个值。对达到这一目的,我们有很多种实现。

第一个选择是使用interval操作符。这个操作符要求一个可选参数,定义了每次发送值得时间。考虑下面的代码:

import { interval } from 'rxjs/observable/interval';

interval(10000).subscribe(console.log);

这里我们创建了一个能够发送无限整数序列的Observable对象。该对象每隔 10 秒会发出一个整数值。这意味着第一个值也会在延迟给定时间之后才会发出。为了更好理解这一行为,我们可以看一下interval的弹子图。

interval_operator

对,就像我们想的那样。这一个值会“延迟”,这不是我们想要的。为什么?因为如果我们从仪表盘导航到列表组件,希望阅读一些有趣的笑话,我们得等待 10 秒钟,才会从服务器请求数据,然后显示到屏幕。

我们通过引入另外一个操作符来解决这一问题。这个操作符是startWith(value),可以先发出一个给定值作为初始值。但我们可以做得更好!

我会告诉你,其实有一个操作符可以在给定的一段是时间之后(初识延迟)按照特定时间(正常间隔)发出一个值的序列。这就是timer

可视化时间!

timer_operator

酷!但这真的解决我们的问题了吗?是的。如果我们将初始值设置为 0,将间隔时间设置为 10 秒,我们就会有类似interval(10000).pipe(startWith(0))的行为,但只用了一个操作符。

让我们把这种实现带到我们的缓存机制中去吧。

我们需要建立一个定时器,每次触发都发送一个 HTTP 请求,去服务器获取新的数据。也就是说,每一次定时器触发,我们需要使用switchMap转换到一个Observable对象,在订阅时获取新的笑话列表。使用switchMap还有一个额外的好处是,我们可以避免竞争条件。这是这个操作符天生就有的特点,它会取消Observable之前的订阅,仅仅为最新的对象发出值。

我们的缓存的剩余部分不需要改变,意味着我们的流还是多播的,所有的订阅者共享一个底层源。

再说一遍,shareReplay天生就会将新的值广播给所有订阅者,并且将最近的值发送给新的订阅者。

timer_cache

正如我们在弹子图看到的那样,timer每 10 秒发出一个值。每一个值都会转换成一个内部Observable对象,去获取我们所需要的数据。因为我们使用了switchMap,我们避免了竞争条件,因此消费者只会接收到值 1 和 3。内部Observable对象发出的第二个值会被“跳过”,因为新值到了的时候已经取消了。

让我们利用学到的知识,更新下JokeService

import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay } from 'rxjs/operators';

const REFRESH_INTERVAL = 10000;

@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;

  constructor(private http: HttpClient) { }

  get jokes() {
    if (!this.cache$) {
      // Set up timer that ticks every X milliseconds
      const timer$ = timer(0, REFRESH_INTERVAL);

      // For each tick make an http request to fetch new data
      this.cache$ = timer$.pipe(
        switchMap(_ => this.requestJokes()),
        shareReplay(CACHE_SIZE)
      );
    }

    return this.cache$;
  }

  ...
}

厉害!想自己试试吗?下面是一个现实的示例。从仪表盘开始,到列表组件,然后看看有什么魔法出现。等几秒钟,就可以看到更新的动作。记住,缓存每 10 秒刷新一次,但通过修改REFRESH_INTERVAL的值就可以改变这一间隔。

发送更新通知

让我们回顾一下目前所构建的内容。

当我们通过JokeService请求数据时,我们希望数据从缓存获得,而不是每次都去请求服务器。缓存的底层数据每 10 秒刷新一次。刷新之后,数据会推送给组件,组件自动更新。

这有点问题。想象一下,如果我们是一个用户,正在阅读某个笑话,突然间这个笑话消失了,因为 UI 自动更新了。这无疑非常讨厌,是很坏的用户体验。

因此,我们的用户应该在有新数据时获得通知。换句话说,我们希望用户自己去更新 UI。

事实证明,我们不需要修改服务来实现这个功能。这个逻辑很简单。毕竟,我们的服务不应该关心发送通知的问题,视图应该负责何时怎样更新屏幕的数据。

首先,我们需要给用户展示一个初始值,否则的话在第一次缓存更新之前,屏幕就是空白的。我们马上就会看到原因。设置一个初始化的流与调用访问器函数一样简单。另外,既然我们只关心第一次的值,我们可以使用take操作符。

为了逻辑上的可读性,我们创建一个副主函数getDataOnce()

import { take } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  ...
  ngOnInit() {
    const initialJokes$ = this.getDataOnce();
    ...
  }

  getDataOnce() {
    return this.jokeService.jokes.pipe(take(1));
  }
  ...
}

从我们的需求可以知道,我们只想在用户真正需要更新 UI 的时候出去更新,而不是自动更新。你会问,用户如何要求更新界面?当用户点击了“更新”按钮时,我们就知道用户想要更新 UI。这个按钮和通知一起显示。现在,我们不去关心通知,把注意力集中在点击按钮之后的更新逻辑上面。

为了实现这一目的,我们需要从 DOM 事件(也就是按钮点击的事件)创建一个Observable对象。有很多种方法可以实现,但最常用的是使用Subject对象作为模板与组件视图逻辑之间的桥梁。简单来说,Subject即是Observer又是ObservableObservable定义了数据流,能够发出数据;Observer则能够订阅Observable并且接收数据。

好消息是,我们可以在模板的事件绑定中直接使用Subject,然后在事件发出时调用其next函数。这会产生一个特定值,广播给所有监听这个值的Observer对象。注意,如果Subjectvoid类型的,我们可以简单地忽略这个值。事实上,我们的用例就是这样子的。

让我们继续,实例化一个Subject对象吧。

import { Subject } from 'rxjs/Subject';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...
}

下面继续,将这个值用到模板中。

<div class="notification">
  <span>There's new data available. Click to reload the data.</span>
  <button mat-raised-button color="accent" (click)="update$.next()">
    <div class="flex-row">
      <mat-icon>cached</mat-icon>
      UPDATE
    </div>
  </button>
</div>

注意我们在<button>标签的点击事件使用了事件绑定语法。当点击按钮时,我们发出一个幽灵值,可以被所有活动的Observer注意到。我们将其称为“幽灵”,是因为我们不会传递任何值,或者说只是一种void类型的值。

另一种实现是使用@ViewChild()装饰器结合 RxJS 的fromEvent操作符。但是,这会要求我们“混合” DOM 以及从视图查询 HTML 元素。使用Subject,我们仅仅将两边桥接起来,除了向按钮添加事件绑定之外,并不再触及 DOM。

好了,视图设置完毕,我们可以切换回更新 UI 的逻辑部分。

那么,更新 UI 意味着什么?既然缓存已经在后台自动刷新了,我们想要在点击按钮之后从缓存获取到最新值来渲染界面,对吧?这意味着我们的源数据流也是一个Subject。每当update$发出一个值,我们都希望将其映射到一个能够给我们最新值的Observable对象。换句话说,我们正在处理的是所谓“高阶可观察对象(Higher Order Observable)”,一个能发射ObservableObservable对象

之前我们知道,switchMap正是为了解决这个问题。这一次我们选择使用mergeMap。这个操作符非常像switchMap,区别在于它不会取消之前订阅的内部Observable对象,而是将内部发出值合并到外部的Observable

事实上,当从缓存请求最新值时,HTTP 请求已经完成,缓存已经更新。因此,在这里我们并不会遇到竞争条件。虽然看起来像是异步的,但实际上是同步的,因为值在同一时间发出。

import { Subject } from 'rxjs/Subject';
import { mergeMap } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...

  ngOnInit() {
    ...
    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

太好了!每一次“更新”,我们都会使用我们之前实现的辅助函数从缓存中获取到最新值。

现在,为屏幕上呈现的笑话提供数据流只是一小步。我们还需要将初始笑话列表与update$流整合起来。

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;
  update$ = new Subject<void>();
  ...

  ngOnInit() {
    const initialJokes$ = this.getDataOnce();

    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );

    this.jokes$ = merge(initialJokes$, updates$);
    ...
  }
  ...
}

注意,我们使用辅助函数getDataOnce(),将每一个事件映射为一个最新的缓存值。回忆一下,这个函数内部会使用take(1)来获取第一个值,然后结束整个流。这是至关重要的,否则的话,我们会得到一个正在运行的流,或者是直接连接到缓存。在这种情况下,我们只需点击“更新”按钮,就会终止强制更新 UI 的逻辑。

同时,因为底层缓存是多播的,所以重复订阅缓存以便获得最新值也是没有任何问题的。

在我们继续通知流之前,我们先暂停一下,用弹子图看看现在我们实现了什么。

jokes

正如上面的图中显示的那样,initialJokes$是至关重要的,因为没有它,我们只能在点击了“更新”之后才能在屏幕上看到内容。虽然数据在后台每 10 秒更新一次,但我们没有办法点击按钮。这是因为按钮时通知的一部分,我们并没有将其显示给用户。

让我们把这个坑填完,实现这个谜题的缺失的部分。

为了达到这一目的,我们需要创建一个Observable对象,负责显示或隐藏通知。本质上,我们需要一个能发出truefalse的流。当有更新的时候,这个值应该是true;当用户点击“更新”按钮时,这个值应该是false

另外,我们还得跳过由我们的缓存发出的第一个(初始)值,因为这不是一个刷新操作。

我们从流的角度思考这个问题,我们可以把它分解成多个流,然后再合并到一起,成为一个单一的可观察的流。最终的流就是我们所需要的行为,显示或隐藏通知。

理论已经足够了!现在是编码:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { skip, mapTo } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  ...

  ngOnInit() {
    ...
    const initialNotifications$ = this.jokeService.jokes.pipe(skip(1));
    const show$ = initialNotifications$.pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }
  ...
}

这里,我们监听从缓存发出的所有值,但是跳过第一个,因为它不是一个刷新操作。对initialNotifications$上的每一个新值,我们将其映射为true,以便显示通知。一旦点击了通知中的“更新”按钮,update$会产生一个值,我们将其简单地映射为false,来使通知消失。

我们在JokeListComponent的模板中使用showNotification$,通过切换 class 显示或隐藏通知。

<div class="notification" [class.visible]="showNotification$ | async">
  ...
</div>

很好!我们已经非常接近最终的解决方案了。但在我们继续之前,先来尝试下实例。花点时间一步步浏览下代码。

按需获取新的数据

太棒了!经过了这么长的道路,我们已经为我们的缓存实现了很多非常酷的特性。在结束本文之前,我们要将缓存提升到一个新的层次。现在还有一件事留给我们。作为一个用户,我们想要在任意时间点强制更新。

这并不是非常复杂,但我们得同时修改组件和服务。

让我们从服务开始。我们需要一个公开的 API,能够强制重新加载缓存中的数据。技术上来说,我们可以完成当前缓存,将其设置为null。这意味着下一次我们从服务器请求数据时,我们的服务会设置一个新的缓存,获取数据并向未来的订阅者存储数据。当我们强制要求更新时创建新的缓存并不是什么大问题,因为原来的对象会终止,然后被垃圾回收。事实上,这么做还有一个好处是,我们可以重置定时器,这也是我们所需要的。例如,我们已经等待了 9 秒,然后点击“获取新笑话”。我们希望数据刷新,但是不想在 1 秒钟之后就看到通知跳了出来。相反,我们想重新开始定时器,这样当我们强制更新时,就会有另外的 10 秒之后才会触发自动更新。

销毁缓存的另一个原因是,比起让缓存一直存在的其它机制,这种实现简单得多。如果是这样,缓存就需要知道是否需要强制重新加载。

我们创建一个Subject对象,用它来通知缓存结束。我们将利用takeUntil将其提取到我们的cache$流。另外,我们需要实现一个公共 API,其作用是将缓存设置为null,然后向Subject对象广播一个事件。

import { Subject } from 'rxjs/Subject';
import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay, map, takeUntil } from 'rxjs/operators';

const REFRESH_INTERVAL = 10000;

@Injectable()
export class JokeService {
  private reload$ = new Subject<void>();
  ...

  get jokes() {
    if (!this.cache$) {
      const timer$ = timer(0, REFRESH_INTERVAL);

      this.cache$ = timer$.pipe(
        switchMap(() => this.requestJokes()),
        takeUntil(this.reload$),
        shareReplay(CACHE_SIZE)
      );
    }

    return this.cache$;
  }

  forceReload() {
    // Calling next will complete the current cache instance
    this.reload$.next();

    // Setting the cache to null will create a new cache the
    // next time 'jokes' is called
    this.cache$ = null;
  }

  ...
}

这部分没做太多工作,所以我们继续,以便将其用到JokeListComponent中。我们实现了一个函数forceReload(),当我们点击了“获取新笑话”按钮之后会被调用。另外,我们还需要创建一个Subject对象,作为更新 UI 并且显示通知的事件总线。我们马上就会看到它的作用。

import { Subject } from 'rxjs/Subject';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  forceReload$ = new Subject<void>();
  ...

  forceReload() {
    this.jokeService.forceReload();
    this.forceReload$.next();
  }
  ...
}

在适当的位置上,我们将JokeListComponent模板中的按钮连接起来,以便强制重新加载缓存数据。我们需要做的就是使用 Angular 的事件绑定语法监听点击事件,然后调用forceReload()

<button class="reload-button" (click)="forceReload()" mat-raised-button color="accent">
  <div class="flex-row">
    <mat-icon>cached</mat-icon>
    FETCH NEW JOKES
  </div>
</button>

这已经可以正常工作了,但只有当我们回到仪表盘,再重新进入列表视图时才是正常的。这当然不是我们所需要的。我们想要强制更新缓存数据的时候,UI 能够立即更新。

还记得我们实现了updates$流,当我们点击“更新”时,会从缓存获取最新的数据?我们就需要类似这种的行为,所以我们需要扩展一下这个流。这意味着,我们需要将update$forceReload$合并起来,因为这两个流都需要更新 UI。

import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...

  ngOnInit() {
    ...
    const updates$ = merge(this.update$, this.forceReload$).pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

这是不是很简单?是的,但我们还没完成。事实上,我们刚刚“打断”了我们的通知。这本可以正常工作,直到我们点击了“获取新笑话”。屏幕上的数据会更新,缓存中的也是,但我们等待 10 秒之后,并没有弹出通知。问题出在强制更新缓存会完成缓存实例,这意味着组件再也不会收到值。简单来说,通知流(initialNotifications$)死了。这很不想,怎么解决这个问题呢?

很简单!我们可以监听forceReload$的事件,对其每一个值都切换到一个新的通知流。重要的是,我们需要取消之前的流的订阅。听起来耳熟吗?好像我们需要switchMap,是不是?

让我们动手实践吧!

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { take, switchMap, mergeMap, skip, mapTo } from 'rxjs/operators';

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...

  ngOnInit() {
    ...
    const reload$ = this.forceReload$.pipe(switchMap(() => this.getNotifications()));
    const initialNotifications$ = this.getNotifications();
    const show$ = merge(initialNotifications$, reload$).pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }

  getNotifications() {
    return this.jokeService.jokes.pipe(skip(1));
  }
  ...
}

好了。只要forceReload$一发出值,我们就取消订阅之前的Observable,切换到一个新的通知流。注意我们有一段代码需要两遍,也就是this.jokeService.jokes.pipe(skip(1))。为了避免重复代码,我们创建一个函数getNotifications(),返回跳过第一个值的笑话的流。最后,我们将initialNotifications$reload$合并到名为show$的流。这个流负责在屏幕上显示通知。这里并不需要取消订阅initialNotifications$,因为这个流在下一次订阅重新创建缓存之前就已经结束了。其余部分保持不变。

呼,我们完成了。现在花掉事件看看我们实现了什么。
notification_cache

在弹子图中可以看到,initialNotifications$对显示通知非常重要。如果我们缺失了这个流,就只能在强制更新缓存之后才会看到通知。也就是说,我们按需请求新的数据时,必须不断切换到一个新的通知流,因为之前的(旧的)Observable对象已经完成,不会再发出值。

大功告成!我们使用 RxJS 和 Angular 提供的工具创建并实现了一个复杂的缓存机制。回顾一下,我们的服务暴露了一个笑话列表的流。底层的 HTTP 请求每 10 秒更新缓存。为了改进用户体验,我们显示一个通知,以便用户强制更新 UI。在这之上,我们还实现了一个允许用户按需请求新的数据的方法。

太棒了!这就是最终的解决方案。花几分钟检查一下代码,尝试下不同的场景,看看一切是否正常。

前景

如果你想做点作业,或者再多思考下,下面有几个可以改进的地方:

  • 添加错误处理
  • 重构组建中的逻辑到一个服务以便重用

特别感谢

特别感谢 Kwinten Pisman 帮助完成代码。同样,感谢 Ben LeshBrian Troncone 提供的宝贵反馈以及之处一些改进点。另外,感谢 Christoph Burgdorf 帮助复查文章和代码。

Leave a Reply