Angular 前端

Angular 學習筆記(11) — RxJs Operators

JamesW 2024/08/26 07:57:01
84

上一篇介紹了Observable,而RxJs Operators 用在Observable 在訂閱Subscribe之前,可透過加入pipe來重新整理觀測資料的結構。

目標:了解工具RxJs Operators

 

參考文章文件

map

  • 將一個訂閱可以得到的資料轉換成另外一筆資料:

 

  • 重新整理過後的response 就會變成不同的結構:

 

tap

  • 主要取得訂閱資料進行簡單設定本地參數、或檢查來進行除錯,不會影響資料本身,用在資料不需要邏輯判斷的情境

 

this.route.params
 .pipe(
   tap(({ action, id }) => {
     this.action = action;
     this.columnId = id;
   }),
 )

switchMap

  • 當訂閱的資料需要同時去做其他的HTTP Request時,可以在pipe裡使用switchMap同時進行其他行為:

參考文章

this.route.params
.pipe(
  switchMap(({ id }) => this.columnLibraryService.fetchOne(id))
)
.subscribe(({ data: { columnLibrary } }) => {
 //在subscribe下面接上switchMap的response
  this.columnLibraryItem = columnLibrary; 
}

...
switchMap(({ id }) =>
 return of(id) // switchMap需要回傳observable,可用of回傳
)

concatMap

  • concatMap會等前面的 Observable 結束後,才會「接續」(concat)新產生的 Observable 資料流。
  • 轉換後的 Observable 基本上都必須設定結束條件,也就是要確保會完成 (complete),否則很容易就會產生不可預期的問題

 

...
.pipe(
    concatMap(() => {
      return this.tokenService.doPost();
    }),
    tap(({ accessToken, refreshToken }) => {
      this.accessToken.set(accessToken);
      this.refreshToken.set(refreshToken);
    })
  );
...

forkJoin

  • 在做pipe或subscribe以前,結合兩個或以上的Http Request 會在所有 observable 都完成(complete)後,才會取得最終的結果。

 

return forkJoin([
    this.compTypePmcProviderService.fetch(),
    this.compStatusProviderService.fetch(),
  ]).pipe(
    tap(([{ data: categoryOptions }, { data: statusOptions }]) => {
      this.categoryOptions = categoryOptions;
      this.statusOptions = statusOptions;
    })
  );
forkJoin([
      this.menuIconProviderService.fetch(),
      this.menuStatusProviderService.fetch(),
      this.displayMenuProviderService.fetch(),
      this.programCategoryProviderService.fetch(),
    ]).subscribe(
      ([
        { data: iconOption },
        { data: statusOption },
        { data: displayOption },
        { data: programOptions },
      ]) => {
        this.iconOption = iconOption;
        this.statusOption = statusOption;
        this.displayOption = displayOption;
        this.programOptions = programOptions;
      }

combineLatest

  • 將 Observables 訂閱後把陣列內的資料流組合起來,在 RxJS 整個資料流有資料變更時才會發生。

參考文章

combineLatest([this.route.params, this.route.queryParams])
.subscribe(([{ id }, { params }])=>{ 
 
}) 
// 兩個 route Observable只能用combineLatest組合,
// 因為route會持續變動所以尚未完成(complete)

take

  • 只取得開頭n個值

 

return this.authService.user.pipe(
      take(1),/// 只接取BehaviorSubject的一筆資料
  )

exhaustMap

  • Observable 有新事件發生時,若上一個 Observable動作尚未結束,就不會產生新的 Observable,避免產生重複的 API 請求。

參考文章

/** BehaviorSubject **/
user = new BehaviorSubject<User>(null);

/** exhaustMap 能夠同時使用多個pipe的方式 **/
fetchRecipes() {
    return this.authService.user.pipe(
      take(1),/// 只接取BehaviorSubject的一筆資料
      exhaustMap((user) => {
        return this.http.get<Recipe[]>(
          //1.插入response 的Interface
          'https://ng-course-recipe-book-a4606-default-rtdb.firebaseio.com/recipes.json',
          {
            params: new HttpParams().set('auth', user.token),
          }
        );
      }),
      map((response) => {
        return response.map((response) => {
          return {
            ...response,
            ingredients: response.ingredients ? response.ingredients : [],
          }; /// 加入食材欄位邏輯
        });
      }),
      tap((response) => {
        ////???????
        this.recipeService.setRecipes(response);
      })
    );
  }
}

EMPTY

  • 若中斷執行中的rxjs,使用return EMPTY 中斷

 

pipe(
  map(() => this.isAuthenticated),
  catchError((e) => {
    console.error('[Auth]', e);

    return EMPTY; // 立即完成的 observable 。
  })
);

of

  • observable 傳值方式,可同步傳遞多個值

 

handleShow: (params: any) => Observable<any> = (params) => {
isProgramCategoryReadOnly = true;
    ...

    return of(true);
  };

參考文章

filter

  • filter 是常用的「過濾類型」的 operator , 每當來源資料流事件的資料符合 callback function 內的條件時,才會發生。

 

pipe(
  filter(data => data > 3)
)
.subscribe(data => {
  console.log(`filter 範例 (1): ${data}`);
});
// filter 範例 (1): 4
// filter 範例 (1): 5
// filter 範例 (1): 6
// filter 範例 (1): 7
// filter 範例 (1): 8
// filter 範例 (1): 9

參考文章

catchError 、throwError

  • API 回傳時的錯誤情境,Response發生錯誤時,顯示錯誤訊息:

 

  • 可以 console.log(error)觀看錯誤:

 

  • Rxjs 中也可以抓錯:

 

  • 若是request 發生錯誤:

 

  • 回傳 response 可用pipe的catchError、throwError的方式搭配HttpClient 來處理錯誤的error。

 

/** service.ts **/
import { catchError, throwError } from 'rxjs'; //引入

.... 

signUp(email: string, password: string) {
    return this.http
      .post<AuthResponseData>('api權限url', {
        email: email,
        password: password,
        returnSourceToken: true,
      })
      .pipe(
        catchError((errorRes) => {
          let errorMessage = 'An unknown error occurred!';

          if (!errorRes.error || !errorRes.error.error) {
            return throwError(errorMessage);
          }
          switch (errorRes.error.error.message) {
            case 'EMAIL_EXISTS':
              errorMessage = 'This is email is already exists';
          }

          return throwError(errorMessage);
        })
      );
  }


//** component.ts **//
this.authService.signUp(email, password).subscribe({
   ...
    error: (errorMessage) => {
      ///error已透過pipe的catchError整理
      this.error = errorMessage;
    },
  });
  • HttpErrorResponse

 

import { HttpErrorResponse } from '@angular/common/http';
...
.pipe(
        catchError((errorRes) => {
          let errorMessage = 'An unknown error occurred!';

          if (!errorRes.error || !errorRes.error.error) {
            return throwError(errorMessage);
          }
          switch (errorRes.error.error.message) {
            case 'EMAIL_EXISTS':
              errorMessage = 'This is email is already exists';
          }

          return throwError(errorMessage);
        })
      );
)
...

==> /*** 使用 HttpErrorResponse 來接替錯誤訊息處理 **/
  ...
  .pipe(catchError(this.handleError));
  ...
  
  private handleError(errorRes: HttpErrorResponse) {
      let errorMessage = 'An unknown error occurred!';
  
      if (!errorRes.error || !errorRes.error.error) {
        return throwError(errorMessage);
      }
      switch (errorRes.error.error.message) {
        case 'EMAIL_EXISTS':
          errorMessage = 'This is email is already exists';
      }
  
      return throwError(errorMessage);
    }

finalize

  • 當Observable完成或報錯時調用函數

 

const source = interval(1000);
const example = source.pipe(
  take(5), //take only the first 5 values
  finalize(() => console.log('Sequence complete')) // 完成或錯誤時執行
);
const subscribe = example.subscribe(val => console.log(val));

// results:
// 0
// 1
// 2
// 3
// 4
// 'Sequence complete'

Custom Operator

  • 自定義一個RxJs的Operator,將複雜的運作邏輯抽離,做成共用的utlis
  1. 將原本的Observable做pipe的簡化

 

this.service.fetch()
 .pipe(
   map(({data})=>{
    .....

2. 建立一個utlis,將RxJs的步驟抽離:

/** utlis.ts **/

// 將 RxJs operator 抽離
export const transformData = () => (source: Observable<data[]>) => {
    return source.pipe(  
      map(({ data }) => {
     ...

3. 套用上分離的operator即可使用:

this.service.fetch()
 .pipe(
  transformData() // 套上抽離出來的operator 
 )

結語:

學會使用RxJs Operators ,可以靈活的處理非同步的資料。在後續的處理資料以及拆分邏輯對開發來說是很有幫助的工具,依照不同情境使用不同的Operators 相當的方便,可將資料變為非同步資料後,利用工具進行一系列的處理。

JamesW