狠狠色丁香婷婷综合尤物/久久精品综合一区二区三区/中国有色金属学报/国产日韩欧美在线观看 - 国产一区二区三区四区五区tv

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發文檔 其他文檔  
 
網站管理員

RxJS 全面解析

admin
2024年11月11日 19:35 本文熱度 585

作者:霍世杰 @阿杰

前言

打開此文的小伙伴想必對 RxJS 已經有了或多或少的了解,如果沒有倒也無妨,因為下面會從零開始講起;如果你帶著幾個問題來翻閱,本人也希望此文可以帶你找到答案。

溫馨提示:文章內容較長,建議收藏反復觀看。

概覽

從我個人的學習 RxJS 的歷程來看,最開始是“照貓畫虎”能夠基本使用,隨后是研究部分操作符和使用場景,最后了解產生背景、設計思想以及實現原理。在這期間有過很多疑問,也曾從不同角度理解 RxJS,最終總結了認為比較系統的知識圖譜(下圖)。

深入理解 RxJS

大“道”——響應式編程

全面理解一個事物,追溯其歷史是一種好的方式,RxJS 的起源需要追溯到響應式編程(RP),后續產生了一系列基于響應式編程范式的語言擴展(Rx,RxJS 就是其中之一),請看歷史簡譜(左向右延續)。

何為響應式

響應式是學習 RxJS 必須要理解的概念,本人用了大量的文字來解釋,如果您已經深刻理解,可直接跳過。如果您是第一次接觸這個名詞,也不要先給自己心里暗示它有多么的高深和難以理解,也許你天天在使用。

一個例子

為了避免上來就接觸晦澀的概念,先來舉個例子:博客平臺關注功能。話說你偶然瀏覽到阿杰的文章,覺得寫的很贊,于是你關注了他的博客賬號,以便不會錯過之后的干貨,在以后的日子里阿杰每發布一篇文章博客平臺都會給你推送一條消息,提醒你來給他點點贊,假設博客平臺沒有關注的功能,那么你需要想知道他的最新動態就只能打開他的個人主頁查看文章列表來確認,也許稍不留意就會錯過他的文章。這個例子出現了粉絲關注博主、博主發布博客、平臺自動推送給粉絲消息、給文章點贊,這就形成了響應式閉環,平臺在觀察到博主粉絲只需要關注一下就能收到博主以后的動態,這就是響應式帶來的好處。

另一個例子

再舉一個貼近我們開發的例子:假設有一個更新某用戶密碼的需求,A 同事負責調用更新邏輯并在更新后執行其他任務(比如提醒用戶更新成功或失?。珺 同事負責具體更新密碼的邏輯,下圖描述了完成整個任務的流程:

  1. 驗證一下用戶信息的真實性

  2. 驗證密碼是否合法

  3. 最終把新的密碼入庫

上述的每個環節都有可能是異步耗時任務,比如用戶的真實性是第三方平臺驗證的,入庫的過程中網絡非常慢,再比如......等等,諸如此類的各種不確定性,這對于 B 同事做后續任務就有了一個關鍵性條件,確定/等待更新結果,這種情況有一種做法是:定期輪詢重試,B 每隔一段時間執行一次,直到確定 A 已經修改成功,再去執行后續操作。邏輯中定時 A 邏輯結束這種做法這種做法明顯有一個弊端是執行多次,對于 B 顯然不是好的做法,好的做法是:B 的更新邏輯執行完后通知 A,甚至 B 可以先把更新后的事準備好,讓 A 決定后續邏輯的執行時機。

流程如圖示:訂閱/執行更新邏輯、更新邏輯結束、將結果通知調用者、執行后續邏輯。這就是響應式的做法,它帶來的好處是:當更新結果發生變化時自動通知調用者,而不用輪詢重試。

了解響應式宣言

相信你已經明白了響應式,并能發現生活/工作中到處可見,下面了解一下設計響應式模塊/系統遵循的原則:

  • 即時響應性:只要有可能,就要及時地做出響應。

  • 回彈性:執行過程中在出現失敗時依然保持即時響應性。

  • 彈性: 在不斷變化的工作負載之下依然保持即時響應性。

  • 消息驅動:反應式依賴異步的消息傳遞,從而確保了松耦合、隔離、位置透明的組件之間有著明確邊界。

響應式編程

下面我們正式的介紹響應式編程:

響應式編程,Reactive Programing,又稱反應式編程,簡稱 RP,是一種以傳播數據流(數據流概念戳這里 )的編程范式。

響應式編程反應式編程(英語:Reactive programming)是一種面向數據流和變化傳播的聲明式編程范式。這意味著可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。

優勢:

  1. 聲明式,方便地表達靜態或動態的數據流

  2. 自動化,自動將變化的值通過數據流進行傳播

核心思想:從傳統的調用方“拉”數據的思維模式轉變為被調用方“推”數據的思維模式。

JS 異步編程

眾所周知,JS 執行環境是單線程的,在事件監聽,異步的處理,響應式編程毋庸置疑是其中的一大主力。

Callback 時代

回調函數延續至今,JS 的運用高階函數巧妙地將異步后的邏輯進行托管,以事件驅動的方式來解決異步編程,但它有一個“臭名昭著”的問題:回調嵌套,耦合度高。本來很簡單的邏輯但為了控制執行流程卻不得不寫大量的代碼,當時產生了一些知名的庫:async、bluebrid,它們封裝和處理了嵌套問題,暴露出更為簡單好用的 API,額外還可以優雅地處理流程控制相關場景,但所做的只是劃分了邏輯,依舊沒有解決代碼臃腫的問題。

Promise 時代

ES6 納入 Promise 之后可謂一大喜訊,因為它解決了回調嵌套的問題,雖然它只是回調的語法糖,但在處理流程和捕獲錯誤(外層處理)已經非常的優雅了,但它的弊端是:無法監聽和打斷 Promise 的狀態。這意味著一旦聲明它會立即執行并修改它的執行狀態,這源于它的實現。

Generator

Generator 是處于 Promise 和 Async/await 之間的產物,它給我們帶來了寫異步語法像寫同步一般,只需在函數前加 * 修飾,這樣就可以在函數內部使用一個 yield 關鍵字返回結果,類似于 await ,但它也并非完美,不然也不會有后面的 Async/await 了,它的主要問題是流程管理不方便(迭代器模式實現,主動調 next 執行器流轉游標)。

Async/await

Async/await 是 Generator 語法糖,既保留了語法上的優勢,也解決了 Generator 每步要調一下 next 執行器的弊端,是現階段的最佳方案,就是得吐槽一下 Top-level await 到 ES2022 才出現。

其中 Generator 和 Async/await 在異步編程是以等待的方式處理。

ReactiveX

業界一致認為正統的響應式實現/擴展是 ReactiveX 系列。

ReactiveX,簡稱 Rx,是基于響應式的擴展,是各種語言實現的一個統稱,除了我們所知道的 RxJS,還有 RxJava、Rx.NET、RxKotlin、RxPHP.....它最早是由微軟提出并引入到 .NET 平臺中,隨后 ES6 也引入了類似的技術。

它擴展了觀察者模式,以支持數據序列和/或事件,并添加了操作符,允許您以聲明的方式將序列組合在一起,同時抽象出諸如低級線程、同步、線程安全、并發數據結構和非阻塞I/O等問題。

RxJS

RxJS 全稱 Reactive Extensions for JavaScript,翻譯過來是 Javascript 的響應式擴展,它是一個采用流來處理異步和事件的工具庫,簡單來說 Rx(JS) = Observables + Operator + Scheduler。

擅長做的事:

  • UI 事件:例如鼠標移動、按鈕單擊......

  • 狀態管理:例如屬性更改、集合更新等事件

  • IO 消息事件:服務監聽

  • 廣播/通知:消息總線(Event bus)

  • 網絡消息/事件:例如 HTTP、WebSockets API 或其他低延遲中間件

最大的優勢:異步事件的抽象,這意味著可以把很多事統一起來當作一種方式處理,從而讓問題變得更簡單,同時也降低了學習成本。

注意:RxJS 擅長做異步的事,不代表不可以做同步或不擅長同步的事。

RxJS 在 Angular 中的應用

RxJS 在 Angular 中及其重要,很多核心模塊都是由 RxJS 實現的,比如:

  • 響應式表單

  • 路由

  • HttpClient(封裝的 ajax,類似于 axios)

  • async 管道符

  • 狀態管理

更多: https://angular.io/guide/observables-in-angular

RxJS 核心概念—— Observables

RxJS 中的 Observables 系列是圍繞觀察者模式來實現的,基本角色:

  1. Observable:被觀察者,用來產生消息/數據。

  2. Observer:觀察者,用來消費消息/數據。

Observable

Observeable 是觀察者模式中的被觀察者,它維護一段執行函數,提供了惰性執行的能力(subscribe)。

核心函數

  • constructor(_subscribe) : 創建 Observeable

  • static create(_subscribe):靜態函數創建 Observeable

  • pipe():管道

  • subscribe():執行初始化傳入的 _subscribe

RxJS 中 Observeable 是一等公民,將一切問題都轉化為 Observable 去處理。轉換的操作符有from 、 fromEvent 、 of 、 timer 等等,更多戳 這里。
注意的是:只有 ObservableInput或 SubscribableOrPromise類型的值才可以轉化為 Observable。

基本使用

源碼實現

本人寫(抽取)了一套 RxJS Observable 源碼中的核心實現

Observable 與 Promise

用過兩者的同學可能會有疑問為什么采用 Observable 而不直接用 Promise 或 Async/await,這兩者在業界也常常用來做對比。

它們關鍵性的不同點:


ObservablePromise
使用場景同步、異步均可使用用 Promise 包裹的多數是異步場景
執行時機聲明式惰性執行,只有在訂閱后才會執行創建時就立即執行
執行次數多次調用 subscribe 函數會執行多次只有第一次執行,后續都是取值
流程控制相較于 Promise 有更為全面的操作符提供串行、并行的函數
錯誤處理subscribe 函數捕獲錯誤.catch 捕獲

總的來說,Promise 可讀性更優,Observable 從使用場景更為全面。

兩者的相互轉換

在既使用了 RxJS 又引用了用 Promise 封裝的庫時,兩者相互轉換是容易碰到的問題,RxJS 提供了兩者轉換的函數。

Promise 轉 Observable

from 或 fromPromise(棄用) 操作符

const observable$ = from(fetch('http://xxx.com/'));

Observable 轉 Promise

const promise = of(42).toPromise();
const errorPromise = throw(new Error('woops')).toPromise();
errorPromise.catch(err=> console.error);

Subscriber/Observer

Subscriber/Observer 是觀察者模式中的觀察者/消費者,它用來消費/執行 Observable 創建的函數。

核心能力

  • next (傳值)

  • error (錯誤處理)

  • complete (完成/終止)

實現

白話描述:

  1. 將 subscribe 傳進去一個 next 函數賦給 Observer 的 next 函數。

  2. 將 Observer 傳給 Observable 初始化的預加載函數 _subscribe。

  3. 執行 Observable 初始化的預加載函數

工作流程

Subscription

上面的 Observable 和 Observer 已經完成了觀察者模式的核心能力,但是引發的一個問題是,每次執行一個流創建一個 Observable,這可能會創建多個對象(尤其是大量使用操作符時,會創建多個 Observable 對象,這個我們后面再說),此時需要外部去銷毀此對象,不然會造成內存泄露。

為了解決這個問題,所以產生了一個 Subscription 的對象,Subscription 是表示可清理資源的對象,它是由 Observable 執行之后產生的。

核心能力

  • unsubcribe (取消訂閱)

  • add (分組或在取消訂閱之前插入一段邏輯)

注意:調用unsubcribe后(包含add傳入的其它 Subscription)不會再接收到它們的數據。

使用

實現

白話描述:

  1. 調用 Observable 的 subscribe 后會添加(add 方法)到 Subscription(這里有個關系 Subscriber 繼承了 Subscription) 中,并把 Subscriber(也是 Subscription)返出去。

  2. 調用 Subscription 的 unsubscribe 方法。

  3. unsubscribe 把該對象置空回收。

完整工作流程

Subject

上述的 Observable 歸根到底就是一個惰性執行的過程,當遇到以下兩種情況就顯得偏弱:

  1. 推送多條數據時,需要就要創建多個對象。

  2. 狀態管理或消息通訊,監聽數據變化并實時推送。

基于這兩個方面,所以產生了 Subject,Subject 是一個特殊的 Observable,更像一個 EventEmitter,它既可以是被觀察者/生產者也可以是觀察者/消費者。

優勢

  1. 減少開銷和提高性能

  2. 數據實時推送

場景

消息傳遞或廣播。

與 Observable 的區別


ObservableSubject
角色生產者(單向)生產者、消費者(雙向)
消費策略單播多播
流轉方式內部發送/接收數據外部發送/接收數據
數據特性冷數據流熱數據流
消費時機調用 subscribe調用 next

重點解釋一下消費策略和消費時機兩塊:

冷數據流:可以訂閱任意時間的數據流。

熱數據流:只給已訂閱的消費者發送消息,定閱之前的消費者,不會收到消息。

用一個示例來演示:

工作原理

PS:忘記了該圖出自哪篇文章,畫的挺不錯的,這里直接引用了,如有侵權,還望聯系作者。

源碼實現

  • observers 訂閱者集合

  • _subscribe 添加訂閱者

  • next 函數將所有訂閱者推送相同的數據

其他 Subject

種類作用
BehaviorSubject回放數據,如果是訂閱前推送的數據,只回放最新的值
ReplaySubject回放數據,初始化設定要緩存多少次的值,然后將這批消息推送
AsyncSubject只有調用 complete 后才會推送數據

操作符(Operator)

由于篇幅問題,本節并不會細化到講每個操作符

理解操作符

Operator 本質上是一個純函數 (pure function),它接收一個 Observable 作為輸入,并生成一個新的 Observable 作為輸出

export interface Operator<T, R> {
  call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
// 等價于
function Operator(subscriber: Subscriber<R>, source: any){}

遵循的小道

迭代器模式和集合的函數式編程模式以及管道思想(pipeable)

函數式編程

操作符的實現以及使用均依照函數式的編程范式,Functional Programing,簡稱 FP,函數式編程范式,它的思維就是一切用函數表達和解決問題,避免用命令式。

優點:

  • 鏈式調用/組合開發

  • 簡單易寫易讀(聲明式)

  • 可靠性(純函數不存在依賴)

  • 惰性求值(高階函數)

  • 易于測試

更多詳細看這篇 不完全指南

pipe

管道,用來承載數據流的容器,相信大家一定用過 Lodash 的chain,原生 js 數組,NodeJS 開發者 也許還知道 async/bluebird 的 waterfall,Mongodb 的 pipe,它們都遵循管道思想,最直接的好處是鏈式調用,還可以用來劃分邏輯,在異步的場景中還可以做流程控制(串行、并行、競速等等)。

為什么要有操作符?

遵循符合響應式宣言,單向線性的通訊或傳輸數據,pipe 可以降低耦合度以便于閱讀和維護,把復雜的問題分解成多個簡單的問題,最后在組合起來。

操作符與數據流

在 RxJS 的世界解決問題的方式是抽象為數據流,整個閉環是圍繞數據流進行的,所以我們再來理解一下數據流:流,可以把數據可以想像成現實中的水流,河流,流有上游、下游每個階段處理不同的事情,在這過程避免不了要操作流,比如合并、流程控制、頻率控制等等,所以操作符就扮演了此角色。

生命周期:創建流(create、new、創建類操作符)——> 執行流(subscribe) ——> 銷毀流(unsubscribe)

分類

工作原理

迭代器模式:當多個操作符時,組合成多個可迭代對象的集合,執行時依次調用 next 函數。

源碼實現

  1. 操作符傳入 pipe

  2. pipe 將操作符轉換成可迭代的 Array

  3. subscribe(執行流)時消費操作符邏輯

如圖

操作符轉換 Array 源碼

export function pipeFromArray(fns: Array<Function>): Function {
    if (fns.length === 0) {
        return (x: any) => x;
    }
    if (fns.length === 1) {
        return fns[0];
    }
    return (input: any) => {
        return fns.reduce((prev: any, fn: Function) => fn(prev), input);
    };
}

創建自定義操作符

方式一

const isEven = () => {
    return (source: Observable<any>) => {
        return new Observable<any>(observer => {
            const subscription = source.subscribe((x) => {
                observer.next(x % 2 === 0);
                observer.complete();
            })
            return () => subscription.unsubscribe();
        })
    }
}
new Observable(observer => {
    observer.next(7);
})
    .pipe(isEven())
    .subscribe(console.log);
// 執行結果:false

方式二:基于 lift

const odd = () => {
    const operator: Operator<any, any> = {
        call(subscriber: Subscriber<any>, source: any) {
            const subscription = source.subscribe((x: any) => subscriber.next(x % 2 !== 0));
            return () => {
                subscription.unsubscribe();
            };
        },
    }
    return operator;
}
new Observable(observer => {
    observer.next(7);
})
    .lift(odd())
    .subscribe(console.log)
// 執行結果 true

lift 源碼

閱讀彈珠/大理石圖

學會閱讀彈珠圖是快速理解 Rx 操作符的手段之一,有些操作符需要描述時間流逝以及序列,所以彈珠圖有很多的標識和符號,如下圖。

這里有幾個用來理解大理石圖的網站:

學習參考

  • Async.js

  • Lodash

調度器(Scheduler)

何為調度器

也許你在使用操作符的過程中從未在意過它,但它在 Rx 起著至關重要的作用,在異步中如何調度異步任務是很復雜的事情(尤其是以線程為核心處理異步任務的語言),很慶幸的是我們用使用的 JS ,所以不需要過多的關注線程問題,更友好的是大多數操作符默認幫開發者選中了合適的調度模式(下文會講到),以至于我們從忽略了它,但無論如何我們都應該對調度器有基本的了解。

調度器,Scheduler 用來控制數據推送節奏的,RxJS 有自己的基準時鐘和一套的執行規則,來安排多個任務/數據該如何執行。

官方定義:

  • Scheduler 是一種數據結構

  • Scheduler 是一個執行環境

  • Scheduler 是一個虛擬時鐘

種類/模式

種類描述
null不傳遞或 null 或 undefined,表示同步執行
queue使用隊列的方式執行
asap全稱:as soon as possible ,表示盡快執行
async使用 setInterval 的調度。

示例

下面我們舉例略窺一下各個模式的表現。

null/undefined/sync

import { asapScheduler, asyncScheduler, from } from 'rxjs';
function syncSchedulerMain() {
    console.log('before');
    from([1, 2, 3]).subscribe(console.log)
    console.log('after');
}
syncSchedulerMain();
// 執行結果:
// before
// 1
// 2
// 3
// after

asap

function asyncSchedulerMain() {
    console.log('asyncScheduler: before');
    from([1, 2], asyncScheduler).subscribe(console.log)
    Promise.resolve('asyncScheduler: promise').then(console.log);
    console.log('asyncScheduler: after');
}
// 執行結果:
// asapScheduler: before
// asapScheduler: after
// 1
// 2
// asapScheduler: promise

從結果示,from 的數據的輸出順序是在 console.log(同步代碼)之后,promise.then 之前的。

async

function asapSchedulerMain() {
    console.log('asapScheduler: before');
    from([1, 2, 3], asapScheduler).subscribe(console.log)
    Promise.resolve('asapScheduler: promise').then(console.log);
    console.log('asapScheduler: after');
}
// 執行結果:
// asyncScheduler: before
// asyncScheduler: after
// asyncScheduler: promise
// 1
// 2

結果示,from 數據輸出順序是在 console.log(同步代碼)和 Promise.then 之后的。

工作原理

Scheduler 工作原理可以類比 JS 中的調用棧和事件循環,從實現上 aspa和 async也的確交給事件循環來處理。null /undefined相當于調用棧,aspa相當于事件循環中的微任務async相當于宏任務,可以肯定的是微任務執行時機的優先級比宏任務要高,所以從執行時機來看 null > aspa > async。queue運行模式根據 delay 的參數來決定,如果是 0,那么就用同步的方式執行,如果大于 0,就以 async 模式執行。

使用原則/策略

RxJS Scheduler 的原則是:盡量減少并發運行。

  1. 對于返回有限和少量消息的 observable 的操作符,RxJS 不使用調度器,即 null 或 undefined 。

  2. 對于返回潛在大量的或無限數量的消息的操作符,使用 queue 調度器。

  3. 對于使用定時器的操作符,使用 aysnc 調度器。

支持調度器的操作符

of 、 from 、 timer 、 interval 、 concat 、 merge 、 combineLatest ,更多戳 這里。

bufferTime 、 debounceTime 、 delay 、 auditTime 、 sampleTime 、 throttleTime 、 timeInterval 、 timeout 、 timeoutWith 、 windowTime 這樣時間相關的操作符全部接收調度器作為最后的參數,并且默認的操作是在 Scheduler.async 調度器上。

OK,關于調度器我們先了解到這里。

最后

至此,RxJS 內容已經講解完畢,文中概念較多,若大家都能夠理解,就可以對 RxJS 的認知拉到同一個維度,后續需要做的就是玩轉各種操作符,解決實際問題,學以致用才可達到真正的精通。

最后如果覺得文章不錯,點個贊再走吧!

附文中完整代碼與示例: https://github.com/aaaaaajie/simple-rxjs

推薦閱讀

參考


該文章在 2024/11/12 11:22:52 編輯過
關鍵字查詢
相關文章
正在查詢...
點晴ERP是一款針對中小制造業的專業生產管理軟件系統,系統成熟度和易用性得到了國內大量中小企業的青睞。
點晴PMS碼頭管理系統主要針對港口碼頭集裝箱與散貨日常運作、調度、堆場、車隊、財務費用、相關報表等業務管理,結合碼頭的業務特點,圍繞調度、堆場作業而開發的。集技術的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業的高效ERP管理信息系統。
點晴WMS倉儲管理系統提供了貨物產品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質期管理,貨位管理,庫位管理,生產管理,WMS管理系統,標簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務都免費,不限功能、不限時間、不限用戶的免費OA協同辦公管理系統。
Copyright 2010-2025 ClickSun All Rights Reserved