RxJS Higher Order Observables và Utility Operators trong RxJS
- 27-12-2023
- Toanngo92
- 0 Comments
Mục lục
RxJS Higher Order Observables (HOOs)
RxJS Higher Order Observables (HOOs) là những công cụ đặc biệt trong lập trình. Chúng hoạt động tương tự như “map()
“, một công cụ biến đổi dữ liệu.
Hãy tưởng tượng bạn có một loạt các số được phát ra mỗi giây (0, 1, 2, 3, 4…), và bạn muốn làm gì đó với chúng. Ví dụ, “map()
” giúp bạn nhân đôi từng số: 0 thành 0, 1 thành 2, 2 thành 4, và tiếp tục như vậy.
Nhưng HOOs thì đặc biệt hơn: chúng không chỉ biến đổi từng số, mà còn tạo ra một loạt các số mới theo cách đặc biệt, tạo thành một chuỗi mới để bạn có thể làm việc với.
Nó giống như việc bạn không chỉ nhân đôi mỗi số, mà còn tạo ra một chuỗi mới của những con số đã được nhân đôi để bạn có thể theo dõi và sử dụng.
Ví dụ, nếu bạn có một dãy số (0, 1, 2, 3, 4…) và áp dụng HOOs, bạn không chỉ nhân đôi chúng, mà còn tạo ra một dãy số mới (0, 2, 4, 6, 8…) để bạn có thể sử dụng trong các tác vụ khác.
Nguồn gốc của các HOOs?
Nguồn gốc của các Higher Order Observables (HOOs) bắt nguồn từ việc ánh xạ (sử dụng operator map()
) trong RxJS. Khi chúng ta sử dụng map()
, chúng ta có thể biến đổi giá trị emit từ Source
Observable
sang một giá trị mới, và nếu giá trị này chính là một Observable
, chúng ta sẽ thu được một Higher Order Observable.
Ví dụ, khi chúng ta sử dụng map()
trong một trường hợp như sau:
fromEvent(document, "click")
.pipe(map(() => interval(1000)))
.subscribe(console.log);
Ở đây, mỗi lần click sẽ tạo ra một Observable
mới từ hàm interval()
, và kết quả là chúng ta nhận được Higher Order Observable, nghĩa là một Observable
chứa các Observable
khác.
Để xử lý Higher Order Observables này, chúng ta sử dụng ba operators chính: mergeAll()
, switchAll()
, và concatAll()
. Các operators này giúp chuyển đổi Higher Order Observable thành First Order Observable bằng cách subscribe vào các Observable
con mà map()
trả về.
const source = fromEvent(document, "click").pipe(map(() => interval(1000)));
source.pipe(mergeAll()).subscribe(console.log);
source.pipe(switchAll()).subscribe(console.log);
source.pipe(concatAll()).subscribe(console.log);
Cả ba operators merge/switch/concatAll
sẽ giúp chuyển các Higher Order Observables thành First Order Observables bằng cách subscribe vào các Observable
con từ map()
. Điều quan trọng là chúng ta sử dụng merge/switch/concatAll
để làm việc với các Higher Order Observables, mà chính là kết hợp giữa map()
và các operators này.Thông qua việc xử lý các Higher Order Observables, chúng ta có thể tìm hiểu thêm về cách mà merge/switch/concatAll
hoạt động và tính chất riêng biệt của chúng.
Tại sao lại cần HOOs?
Đôi khi, khi làm việc với RxJS, chúng ta gặp phải tình huống lồng nhau các subscription (Nested Subscription) có thể dẫn đến các vấn đề khó kiểm soát và làm cho ứng dụng không ổn định. Hãy xem xét ví dụ sau:
this.queryInput.valueChanges.pipe(debounceTime(500)).subscribe((query) => {
this.apiService.filterData(query).subscribe((data) => {
/*...*/
});
});
Ở đây, chúng ta theo dõi sự thay đổi của FormControl
queryInput
. Mỗi khi có thay đổi sau 500ms (debounceTime()
), chúng ta gọi hàm filterData
từ apiService
với query
mới. Tuy nhiên, cách tiếp cận này không hiệu quả và có thể gây ra vấn đề.
Nested Subscription không tốt vì khi chúng ta subscribe vào các Observable
lồng nhau, chúng hoạt động độc lập và không đồng bộ. Điều này dẫn đến các vấn đề như đua điều kiện (racing conditions) – nơi các request không hoàn thành theo đúng thứ tự mong muốn.
Ví dụ, khi người dùng thay đổi query nhanh chóng, mỗi lần thay đổi tạo ra một Observable
mới gọi đến filterData
. Nếu các yêu cầu không hoàn thành theo thứ tự, dữ liệu hiển thị sẽ không chính xác, vì chúng có thể không khớp với query hiện tại mà thay vào đó là kết quả của query trước đó.
Để tránh vấn đề này, chúng ta cần tìm hiểu và sử dụng các Higher Order Observables (HOOs). HOOs giúp quản lý các Observable lồng nhau và làm cho chúng đồng bộ hơn, giúp tránh vấn đề của Nested Subscription. Áp dụng HOOs sẽ giúp chúng ta quản lý và xử lý các luồng dữ liệu một cách hiệu quả hơn, tránh được các lỗi không mong muốn có thể xảy ra khi sử dụng Nested Subscription.
switchMap() trong RxJS
switchMap()
là một trong những công cụ quan trọng trong RxJS và Angular. Khi áp dụng switchMap()
, nó sẽ chuyển đổi giá trị được phát ra từ một Observable
gốc sang một Observable
mới được xác định bởi hàm projectFunction
. Kết quả cuối cùng của Observable
gốc khi sử dụng switchMap()
sẽ là giá trị mà Observable
mới phát ra.
Hãy xem ví dụ dưới đây:
fromEvent(document, "click").pipe(
switchMap(() => interval(1000).pipe(take(10)))
);
Ở đây, fromEvent(document, 'click')
tạo ra một Observable từ sự kiện click trên tài liệu. Mỗi lần click sẽ phát ra một giá trị (ví dụ, một sự kiện MouseEvent, nhưng ta không quan tâm chi tiết).
Câu lệnh interval(1000).pipe(take(10))
tạo ra một Observable phát ra giá trị mỗi giây và hoàn thành sau 10 giây. Điều này giả lập một yêu cầu mất 10 giây để hoàn thành, và sử dụng interval để theo dõi các giá trị phát ra thay vì sử dụng timer.
Chúng ta sử dụng switchMap()
với Observable gốc là fromEvent()
, sau đó trả về Observable mới là interval()
. Mỗi khi Observable gốc phát ra giá trị mới, switchMap()
sẽ huỷ đăng ký (unsubscribe) khỏi Observable trước đó và thay thế bằng Observable mới. Ví dụ, nếu click lần thứ hai xảy ra khi một Observable đang phát ra giá trị, switchMap()
sẽ ngay lập tức bỏ qua Observable đó và chuyển sang Observable mới từ click lần thứ hai.
Tính chất chính của switchMap()
là chỉ duy trì một đăng ký tại một thời điểm. Nó huỷ đăng ký vào Observable trước đó khi có Observable mới được tạo ra từ sự kiện mới của Observable gốc.
Với ví dụ trên, khi click lên tài liệu, Observable sẽ phát ra và bắt đầu đếm từ 0 đến 9 (sau đó hoàn thành) với mỗi click. Nếu click một lần nữa trong quá trình đếm, Observable hiện tại sẽ bị huỷ và bắt đầu đếm lại từ 0 cho Observable mới.
Sử dụng switchMap()
có thể giúp quản lý việc huỷ đăng ký khi có sự kiện mới từ Observable gốc, rất hữu ích khi xử lý các tác vụ như gửi yêu cầu HTTP trong Angular. Tuy nhiên, cần lưu ý sử dụng switchMap()
chỉ với các tác vụ như lấy dữ liệu, tránh sử dụng cho tạo, cập nhật, xóa để tránh xung đột dữ liệu. Trong trường hợp này, nên sử dụng mergeMap hoặc concatMap thay thế.
mergeMap() trong RxJS
mergeMap()
là một công cụ quan trọng trong RxJS, thường được sử dụng sau switchMap(). Khi sử dụng mergeMap()
, nó sẽ chuyển đổi giá trị được phát ra từ Observable gốc thành một Observable mới được xác định bởi hàm projectFunction. Kết quả cuối cùng của Observable gốc khi sử dụng mergeMap()
sẽ là giá trị được phát ra bởi Observable mới này.
Khác với switchMap()
, mergeMap()
sẽ không huỷ đăng ký (unsubscribe) từ Observable cũ khi có Observable mới. Thay vào đó, mergeMap()
sẽ duy trì nhiều đăng ký tại cùng một thời điểm. Điều này hữu ích khi bạn cần giữ các tác vụ mà không cần hoặc không phù hợp để dừng Observable nếu Observable gốc phát ra giá trị mới. Ví dụ, những nhiệm vụ liên quan đến việc ghi dữ liệu vào cơ sở dữ liệu, nơi mà switchMap()
sẽ phù hợp hơn khi thao tác đọc.
Xem xét đoạn mã dưới đây:
fromEvent(document, "click").pipe(
mergeMap(() => interval(1000).pipe(take(10)))
);
Ở đây, fromEvent(document, "click")
tạo ra một Observable từ sự kiện click trên tài liệu. Mỗi lần click sẽ tạo ra một Observable mới (ví dụ, MouseEvent), nhưng ở đây chúng ta chỉ quan tâm đến việc nhận giá trị khi có sự kiện click.
Câu lệnh interval(1000).pipe(take(10))
tạo ra một Observable mới phát ra giá trị mỗi giây và hoàn thành sau 10 giây. Điều này giả lập một yêu cầu mất 10 giây để hoàn thành, và sử dụng interval để theo dõi các giá trị phát ra thay vì sử dụng timer.
Chúng ta sử dụng mergeMap()
với Observable gốc là fromEvent()
, sau đó trả về Observable mới là interval()
. Mỗi khi Observable gốc phát ra giá trị mới (ví dụ như một sự kiện click), mergeMap()
sẽ tạo ra một Observable mới từ interval()
mà không ngắt kết nối với Observable trước đó. Điều này dẫn đến việc có thể có nhiều Observable được duy trì cùng một lúc. Trong ví dụ này, mỗi lần click sẽ tạo ra một Observable mới từ interval()
.
Như vậy, sự khác biệt chính giữa mergeMap()
và switchMap()
là mergeMap()
không ngắt kết nối với Observable cũ khi có Observable mới phát ra từ Observable gốc, tạo ra nhiều kết nối đồng thời. Điều này cần được chú ý để tránh tạo ra quá nhiều kết nối khi sử dụng mergeMap()
và đặc biệt lưu ý khi dừng mergeMap()
để tránh tình trạng rò rỉ bộ nhớ (Memory Leak).
Ngoài ra, mergeMap()
cũng có thể nhận thêm một tham số concurrent để điều khiển số lượng Observable có thể chạy đồng thời. Nếu bạn thiết lập concurrent = 1, nó sẽ hoạt động tương tự như concatMap.
concatMap() trong RxJS
concatMap()
là một công cụ mạnh mẽ trong RxJS, tương tự như mergeMap()
và switchMap()
. Nó cũng nhận vào một projectFunction và yêu cầu projectFunction này trả về một Inner Observable. Tuy nhiên, khác với mergeMap()
và switchMap()
, concatMap()
sẽ theo dõi và chờ cho đến khi Inner Observable hoàn thành trước khi subscribe vào Inner Observable tiếp theo (nếu có).
Xem xét ví dụ dưới đây:
fromEvent(document, "click").pipe(
concatMap(() => interval(1000).pipe(take(5))) // Giảm từ take(10) xuống take(5) để giảm đoạn mã 😅
);
Ở đây, fromEvent(document, "click")
tạo ra một Observable từ sự kiện click trên tài liệu. Mỗi lần click sẽ tạo ra một Observable mới (ví dụ, MouseEvent), và concatMap()
sẽ quan tâm đến việc nhận giá trị khi có sự kiện click.
interval(1000).pipe(take(5))
tạo ra một Observable mới phát ra giá trị mỗi giây và hoàn thành sau 5 giây. Điều này giả lập một yêu cầu mất 5 giây để hoàn thành.
Chúng ta sử dụng concatMap()
với Observable gốc là fromEvent()
, sau đó trả về Observable mới là interval()
. Mỗi khi Observable gốc phát ra giá trị mới (ví dụ như một sự kiện click), concatMap()
sẽ tạo ra một Observable mới từ interval()
và đợi cho đến khi Observable mới hoàn thành trước khi chuyển sang Observable tiếp theo. Điều này đảm bảo rằng các Observable sẽ được xử lý theo đúng thứ tự, mỗi Observable mới sẽ chờ cho đến khi Observable trước đó hoàn thành trước khi bắt đầu.
Như vậy, concatMap()
rất hữu ích trong các tình huống cần quan tâm đến thứ tự thực hiện, như khi điều khiển đèn giao thông, thực hiện các tác vụ theo thứ tự cụ thể như gọi số báo danh hoặc thậm chí trong nghiệp vụ đơn giản hơn như tải lên hình ảnh theo thứ tự.
Lưu ý: Trong trường hợp sử dụng concatMap()
với concatAll()
và map()
để xử lý Promise, nó có thể không hoạt động đúng nếu các bạn sử dụng Promise với tính chất eager. Ví dụ:
fromEvent(document, "click").pipe(
map(() => axios("...")),
concatAll()
);
Vì Promise được gọi ngay lập tức khi invoke, khiến cho concatAll()
vô nghĩa vì đã có request được gửi từ axios(...)
trước khi concatAll()
có thể xử lý thứ tự. Điều này có thể dẫn đến tình trạng Racing Condition trong nhiều trường hợp.
exhaustMap()
trong RxJS
exhaustMap()
là một công cụ khá đặc biệt trong RxJS. Tương tự như các HOO khác, nó cũng yêu cầu một projectFunction trả về một Inner Observable. Tuy nhiên, nó có một tính chất rất đặc biệt: khi đang xử lý một Inner Observable và nhận được một giá trị mới từ Outer Observable, nó sẽ bỏ qua Inner Observable mới này nếu Inner Observable hiện tại vẫn chưa hoàn thành (chưa complete).
Để hiểu rõ hơn, hãy xem ví dụ dưới đây:
concat(
timer(1000).pipe(mapTo("first timer"), tap(log)), // phát ra "first timer" sau 1 giây
timer(5000).pipe(mapTo("second timer"), tap(log)), // phát ra "second timer" sau 5 giây
timer(3000).pipe(mapTo("last timer"), tap(log)) // phát ra "last timer" sau 3 giây
)
.pipe(
exhaustMap((c) =>
interval(1000).pipe(
map((v) => `${c}: ${v}`),
take(4)
)
) // interval(1000) sẽ hoàn thành sau 4 giây
)
.subscribe(console.log);
Trong ví dụ này, có ba Inner Observable: “first timer”, “second timer”, và “last timer”. Mỗi Inner Observable này sẽ phát ra giá trị theo thời gian được chỉ định và có một Inner Observable mới là interval(1000)
phát ra giá trị mỗi giây và hoàn thành sau 4 giây.
Khi chạy, bạn sẽ thấy rằng exhaustMap()
chỉ chạy Inner Observable của “first timer” và không quan tâm đến “second timer” và “last timer” nếu Inner Observable của “first timer” chưa hoàn thành. Ngay khi Inner Observable của “first timer” hoàn thành, exhaustMap()
mới chạy Inner Observable của “second timer”.
Điều này làm cho exhaustMap()
trở thành một công cụ hiếm hoi trong việc kiểm soát tần suất phát ra giá trị.
Còn các HOO như switchMapTo()
, mergeMapTo()
, và concatMapTo()
, chúng tương tự như phiên bản gốc nhưng không nhận vào projectFunction mà trực tiếp là Inner Observable. Nếu bạn chỉ quan tâm đến việc sử dụng Inner Observable mà không cần giá trị từ Outer Observable, bạn có thể sử dụng các HOO này. Ví dụ:
fromEvent(document, "click").pipe(switchMapTo(interval(1000).pipe(take(10))));
fromEvent(document, "click").pipe(mergeMapTo(interval(1000).pipe(take(10))));
fromEvent(document, "click").pipe(concatMapTo(interval(1000).pipe(take(10))));
Tất cả các đoạn mã này sẽ sử dụng các HOO tương ứng để xử lý Inner Observable từ interval(1000)
mỗi khi có một sự kiện click, nhưng không quan tâm đến giá trị của sự kiện click.
switch/concat/mergeMapTo()
trong RxJS
Các HOO như switchMapTo()
, mergeMapTo()
, và concatMapTo()
là phiên bản đơn giản hóa của các HOO gốc tương ứng. Thay vì nhận một projectFunction, chúng chỉ cần nhận một Inner Observable trực tiếp. Nếu bạn chỉ cần sử dụng Inner Observable mà không cần quan tâm đến giá trị từ Outer Observable, các HOO này là lựa chọn lý tưởng.
Hãy xem ví dụ dưới đây:
fromEvent(document, "click").pipe(
switchMapTo(interval(1000).pipe(take(10)))
);
fromEvent(document, "click").pipe(
mergeMapTo(interval(1000).pipe(take(10)))
);
fromEvent(document, "click").pipe(
concatMapTo(interval(1000).pipe(take(10)))
);
Tất cả ba đoạn mã này sử dụng switchMapTo()
, mergeMapTo()
, và concatMapTo()
để xử lý Inner Observable từ interval(1000)
mỗi khi có một sự kiện click, nhưng không quan trọng giá trị của sự kiện click đó. Điều này giúp đơn giản hóa việc sử dụng RxJS khi bạn chỉ cần tập trung vào Inner Observable mà không cần quan tâm đến giá trị từ Outer Observable.
partition()
trong RxJS
partition()
không chỉ là một HOO thông thường, mà thực sự là một HOF (Higher Order Function). Mặc dù không thuộc loại ánh xạ giá trị nhưng nó vẫn nhận vào một nguồn dữ liệu (Source) và trả về không chỉ một, mà hai Observable mới. Khi sử dụng partition()
, bạn cần cung cấp hai tham số:
- Source Observable: Đây là nguồn dữ liệu mà bạn muốn chia thành hai Observable mới.
- predicateFunction: Đây là một hàm sẽ được gọi cho mỗi giá trị được phát ra từ Source Observable. Dựa vào kết quả của hàm này,
partition()
sẽ chia nguồn dữ liệu thành hai Destination Observables: một Observable chứa các giá trị thỏa điều kiện của hàm này và Observable còn lại chứa các giá trị không thỏa điều kiện đó.
const [even$, odd$] = partition(interval(1000), (x) => x % 2 === 1);
merge(
even$.pipe(map((x) => `even - ${x}`)),
odd$.pipe(map((x) => `odd - ${x}`))
).subscribe(console.log);
// even - 0
// odd - 1
// even - 2
// odd - 3
// ...
partition()
thực sự hữu ích khi bạn cần tách một luồng dữ liệu, ví dụ như các thông báo từ máy chủ, thành hai loại khác nhau để xử lý mỗi loại theo cách riêng.
Còn có một số HOO như expand()
, groupBy()
, và mergeScan()
trong RxJS, nhưng chúng ít được sử dụng hơn so với các HOO phổ biến như switchMap()
và concatMap()
. Tuy nhiên, chúng khá dễ hiểu hơn và bạn có thể tìm hiểu thêm về chúng một cách dễ dàng.
Utility Operators
Đây là những operators cung cấp 1 số tiện ích cho chúng ta mà đôi khi rất hiệu quả.
tap()
trong RxJS
tap()
là một operator khá phổ biến trong RxJS, thường được sử dụng để kiểm tra giá trị của Observable và thực hiện các tác vụ nhỏ mà không thay đổi giá trị của Observable. Bạn có thể sử dụng tap()
để:
- Ghi log giá trị: Đây là cách tốt để ghi log giá trị mà Observable phát ra, cho phép bạn kiểm tra giá trị trước và sau khi áp dụng các operator khác.
interval(1000)
.pipe(
tap((val) => console.log("trước map", val)),
map((val) => val * 2),
tap((val) => console.log("sau map", val))
)
.subscribe();
// trước map: 0
// sau map: 0
// trước map: 1
// sau map: 2
// trước map: 2
// sau map: 4
// ...
2. Thực hiện các thay đổi tùy ý: Bạn có thể thực hiện các thao tác với giá trị mà Observable phát ra mà không ảnh hưởng đến giá trị chính của Observable.
3. Thực hiện các tác vụ không liên quan đến giá trị: Đôi khi, bạn muốn thực hiện các hành động không liên quan đến giá trị mà Observable phát ra. Ví dụ, bạn có thể sử dụng tap()
để khởi động hoặc dừng một trạng thái như hiển thị hoặc ẩn loader.
tap()
không thay đổi giá trị của Observable mà nó được áp dụng. Thay vào đó, nó cho phép bạn kiểm tra giá trị, thực hiện các thay đổi phụ thuộc vào giá trị đó hoặc thực hiện các tác vụ không ảnh hưởng đến giá trị của Observable.
delay()/delayWhen()
Cả hai operator delay()
và delayWhen()
trong RxJS đều giúp hoãn việc phát ra các giá trị từ Observable.
delay()
:
- Nó hoãn việc phát ra giá trị từ Observable theo khoảng thời gian đã được chỉ định.
- Nếu bạn truyền vào một số, nó sẽ chờ một khoảng thời gian đó rồi mới phát ra giá trị.
- Nếu truyền vào một ngày cụ thể, nó sẽ hoãn việc phát ra giá trị đến khi thời gian hiện tại bằng thời điểm đó.
Ví dụ:
fromEvent(document, "click")
.pipe(delay(1000))
.subscribe(console.log);
// sau khi click, sẽ chờ 1 giây rồi mới phát ra giá trị MouseEvent
delayWhen()
:
- Nó cũng hoãn việc phát ra giá trị, nhưng thay vì truyền vào một khoảng thời gian, bạn truyền vào một hàm.
- Hàm này sẽ trả về một Observable, và
delayWhen()
sẽ hoãn việc phát ra giá trị từ Observable ban đầu cho đến khi Observable được trả về từ hàm này phát ra giá trị.
Ví dụ:
fromEvent(document, "click")
.pipe(delayWhen(() => timer(1000)))
.subscribe(console.log);
// sau khi click, sẽ chờ cho đến khi timer phát ra giá trị sau 1 giây, sau đó mới phát ra giá trị MouseEvent
Như vậy, delay()
và delayWhen()
đều giúp hoãn việc phát ra giá trị từ Observable, nhưng delayWhen()
cho phép bạn định nghĩa một quy tắc linh hoạt hơn để xác định thời điểm nào nên phát ra giá trị.
finalize() trong RxJS
finalize()
trong RxJS là một cách dễ dàng để thực hiện một hành động cuối cùng khi một Observable kết thúc, bao gồm cả khi nó hoàn thành thành công hay gặp lỗi.
Ví dụ phổ biến nhất của việc sử dụng finalize()
là khi chúng ta muốn dừng hiển thị biểu tượng hoặc thông báo loading sau khi một API request hoặc một công việc nào đó hoàn tất, cho dù có thành công hay gặp lỗi.
this.loading = true; // Hiển thị biểu tượng loading trước khi thực hiện request
this.apiService
.get()
.pipe(finalize(() => (this.loading = false))) // Dừng hiển thị loading sau khi request hoàn tất
.subscribe();
Ở đây, khi chúng ta gọi this.apiService.get()
, finalize()
sẽ giúp đảm bảo rằng sau khi request hoàn tất (hoặc gặp lỗi), hành động được chỉ định trong callback sẽ được thực hiện. Trong trường hợp này, nó giúp chuyển biểu tượng loading thành trạng thái ẩn khi request kết thúc, bất kể kết quả cuối cùng là thành công hay không.
repeat()
trong RxJS
Tưởng tượng bạn muốn lặp lại một Observable với dữ liệu cụ thể một số lần xác định. Đó chính là công dụng của repeat()
trong RxJS.
Với repeat()
, bạn có thể chỉ định số lần lặp lại mà Observable sẽ thực hiện. Đơn giản như việc gọi hàm repeat()
và truyền số lần bạn muốn lặp lại vào.
of("repeated data").pipe(repeat(3)).subscribe(console.log);
// 'repeated data'
// 'repeated data'
// 'repeated data'
Ở đây, khi chúng ta gọi repeat(3)
, Observable ban đầu sẽ được lặp lại ba lần với cùng dữ liệu là 'repeated data'
. Điều này giúp ta nhận được dữ liệu từ Observable theo số lần lặp mà ta đã chỉ định.
timeInterval()
trong RxJS
timeInterval()
trong RxJS thực sự hữu ích khi bạn muốn đo lường khoảng thời gian giữa các sự kiện trong Observable, như khoảng thời gian giữa các lần người dùng click chuột.
Khi bạn sử dụng timeInterval()
, nó sẽ bắt đầu tính toán thời gian từ thời điểm bạn subscribe vào Observable. Và khi có sự kiện được phát ra từ Observable, nó sẽ đo lường thời gian kể từ thời điểm bạn subscribe đến thời điểm sự kiện đó xảy ra.
fromEvent(document, "click")
.pipe(timeInterval())
.subscribe(console.log);
// click
// TimeInterval {value: MouseEvent, interval: 1000 }
Ở đây, khi người dùng thực hiện click lần đầu tiên sau khi bạn đã subscribe, timeInterval()
đưa ra thông tin về sự kiện (value: MouseEvent
) và thời gian từ lúc subscribe đến lúc sự kiện xảy ra (interval: 1000ms
). Điều này giúp bạn theo dõi và đo lường thời gian giữa các sự kiện trong Observable một cách dễ dàng.
timeout() trong RxJS
trong RxJS rất hữu ích khi bạn muốn đặt một giới hạn thời gian cho Observable để nó phải phát ra giá trị trong khoảng thời gian nhất định.
timeout()
Khi bạn sử dụng timeout()
, bạn cần truyền vào một khoảng thời gian, có thể là số Number đại diện cho milliseconds hoặc một ngày cụ thể dưới dạng Date. Nếu trong khoảng thời gian đó không có giá trị nào được phát ra từ Observable, timeout()
sẽ không chờ đợi mà sẽ kích hoạt một lỗi.
interval(2000)
.pipe(timeout(1000))
.subscribe(console.log, console.error);
// Error { name: "TimeoutError" }
Ở đây, chúng ta có một interval
với mỗi giá trị phát ra mỗi 2 giây. Tuy nhiên, với timeout
được đặt là 1000ms, nếu không có giá trị nào được phát ra trong khoảng thời gian này, Observable sẽ throw ra một lỗi TimeoutError
. Điều này giúp bạn kiểm soát thời gian phản hồi của Observable và xử lý khi không nhận được giá trị nào trong khoảng thời gian quy định.
timeoutWith()
trong RxJS
timeoutWith()
trong RxJS tương tự như timeout()
, nhưng khác biệt chính là thay vì kết thúc bằng một lỗi khi quá thời gian quy định, nó thực hiện một hành động khác.
Khi bạn sử dụng timeoutWith()
, bạn cung cấp một khoảng thời gian cùng với một Observable thứ hai. Nếu Source Observable không phát ra giá trị nào trong khoảng thời gian đó, thay vì kết thúc với một lỗi, nó sẽ thay đổi sang lắng nghe (subscribe) vào Observable thứ hai mà bạn đã cung cấp.
interval(2000)
.pipe(timeoutWith(1000, of('Fallback Value')))
.subscribe(console.log, console.error);
// Sau 1 giây:
// "Fallback Value"
Ở đây, chúng ta có một interval
phát ra một giá trị mỗi 2 giây. Nhưng với timeoutWith
, nếu không có giá trị nào phát ra trong 1000ms, nó sẽ thay đổi sang lắng nghe Observable thứ hai – trong trường hợp này là of('Fallback Value')
, phát ra giá trị ‘Fallback Value’. Điều này giúp giải quyết vấn đề khi không nhận được giá trị nào trong khoảng thời gian quy định của Observable.
toPromise()
trong RxJS
toPromise()
trong RxJS là một phương thức của class Observable dùng để chuyển đổi một Observable thành một Promise. Tên gợi nhớ cho chúng ta biết rằng hàm này thực hiện công việc gì.
async function test() {
const helloWorld = await of("hello")
.pipe(map((val) => val + " World"))
.toPromise();
console.log(helloWorld); // hello World
}
Ở ví dụ này, chúng ta sử dụng toPromise()
để chuyển đổi Observable thành Promise. Nó cho phép chúng ta sử dụng await
để chờ đợi giá trị của Observable sau khi đã áp dụng map()
và sau đó gán kết quả vào biến helloWorld
.
Cần lưu ý rằng toPromise()
sẽ bị deprecated trong phiên bản RxJS v7 sắp tới, do đó nên cẩn thận khi sử dụng và xem xét các cách thức khác thay thế. Trong số các Utility Operator, tap()
thường được sử dụng nhiều nhất để kiểm tra và debug quá trình của Observable.