RxJS Subject và Multicasting trong RxJS
- 06-01-2024
- Toanngo92
- 0 Comments
Mục lục
Observable Execution
Thông thường, mỗi khi bạn subscribe vào một Observable, nó tạo ra một execution mới. Ví dụ:
const observable = interval(500).pipe(take(5));
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
observable.subscribe(observerA);
// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer A complete
Nếu sau đó bạn subscribe một observer khác sau một khoảng thời gian, bạn sẽ thấy rằng các execution là hoàn toàn riêng biệt:
const observable = interval(500).pipe(take(5));
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
observable.subscribe(observerA);
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
setTimeout(() => {
observable.subscribe(observerB);
}, 2000);
// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer A complete
// Observer B: 0
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete
Nhưng có cách nào để chia sẻ một execution giữa các observer không? RxJS sử dụng một design pattern gọi là Observer Pattern.
Chúng ta có thể tạo một hybrid observer, cho phép chúng ta chia sẻ execution giữa các observers:
const hybridObserver = {
observers: [],
registerObserver(observer) {
this.observers.push(observer);
},
next(value) {
this.observers.forEach(observer => observer.next(value));
},
error(err) {
this.observers.forEach(observer => observer.error(err));
},
complete() {
this.observers.forEach(observer => observer.complete());
}
};
hybridObserver.registerObserver(observerA);
observable.subscribe(hybridObserver);
setTimeout(() => {
hybridObserver.registerObserver(observerB);
}, 2000);
// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer B: 0
// Observer A complete
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete
Một cách khác tương tự, chúng ta có thể sử dụng Subject
, một loại đặc biệt của Observable, để chia sẻ execution giữa các observers:
const subject = new Subject();
subject.subscribe(observerA);
observable.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 2000);
// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer B: 0
// Observer A complete
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete
Sử dụng Subject
cho phép chúng ta chuyển từ unicast Observable execution sang multicast, cho phép các observers chia sẻ cùng một execution.
Subject
Chúng ta có thứ gọi là Subject
trong RxJS. Điều đặc biệt là nó không chỉ là một Observable mà còn là một Observer nữa. Điều này có nghĩa là ta có thể đăng ký để nhận thông tin từ nó cũng như gửi thông tin tới nó.
Ví dụ, giả sử chúng ta đang xây dựng một tính năng tìm kiếm realtime. Ta có thể sử dụng Subject
như một “Event Bus” để quản lý thông tin tìm kiếm.
import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { throttleTime, distinctUntilChanged } from 'rxjs/operators';
@Component({
selector: "my-app",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"],
})
export class AppComponent implements OnInit {
searchTerm$ = new Subject<string>();
ngOnInit() {
this.searchTerm$
.pipe(
throttleTime(250, undefined, {
leading: true,
trailing: true,
}),
distinctUntilChanged()
)
.subscribe({
next: (value) => console.log(value),
});
}
onInput(event: Event) {
const target = event.target as HTMLInputElement;
this.searchTerm$.next(target.value);
}
}
Ở đoạn code trên, searchTerm$
là một Subject
. Khi ta gửi thông điệp tìm kiếm mới thông qua onInput
, nó sẽ gửi thông điệp đó tới searchTerm$
.
Việc thú vị là ta có thể áp dụng các toán tử từ RxJS như throttleTime
hoặc distinctUntilChanged
để kiểm soát luồng thông tin tới searchTerm$
, giúp quản lý thông tin tìm kiếm theo cách mà ta muốn.
BehaviorSubject
Khi sử dụng Subject, một vấn đề thường gặp là khi có người subscribe muộn, họ sẽ không nhận được các giá trị đã được phát ra trong quá khứ mà chỉ nhận được các giá trị mới sau khi họ đăng ký.
Hãy xem xét ví dụ sau:
import { Subject, BehaviorSubject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(3);
Kết quả hiển thị sẽ là:
observerA: 1
observerA: 2
observerA: 3
observerB: 3
Để giải quyết vấn đề này, chúng ta có một biến thể của Subject gọi là BehaviorSubject. Điểm đặc biệt của nó là việc lưu trữ giá trị hiện tại, và khi một Observer mới subscribe vào, nó sẽ tự động phát ra giá trị hiện tại đó cho Observer đó.
BehaviorSubject thường được sử dụng để biểu diễn “giá trị qua thời gian”. Ví dụ, một luồng sự kiện về ngày sinh là một Subject, trong khi luồng về tuổi của một người sẽ là BehaviorSubject.
Ví dụ, bạn có thể sử dụng BehaviorSubject để chia sẻ thông tin về người dùng hiện tại đang đăng nhập trong hệ thống với các component khác nhau trong Angular.
Lưu ý quan trọng: BehaviorSubject yêu cầu phải có giá trị khởi tạo khi bạn tạo ra subject.
const subject = new BehaviorSubject(0); // 0 là giá trị khởi tạo
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(3);
Kết quả khi chạy đoạn mã trên sẽ là:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
Như vậy, nhờ BehaviorSubject, ObserverB đã nhận được giá trị hiện tại (là 2) ngay khi nó đăng ký, còn ObserverA cũng nhận được giá trị khởi tạo và tất cả các giá trị mới được phát ra.
ReplaySubject
ReplaySubject là một dạng của Subject trong đó có khả năng “phát lại” hoặc gửi những giá trị cũ cho các Observer mới đăng ký. Nó giữ một bộ nhớ đệm với số lượng giá trị cụ thể và sẽ phát ra những giá trị đó ngay lập tức cho bất kỳ Observer mới nào, cùng với việc phát ra các giá trị mới cho các Observer hiện tại.
Khi tạo một ReplaySubject, bạn có thể chỉ định các tham số sau:
- buffer: Số lượng giá trị tối đa mà nó có thể lưu trữ.
- windowTime: (ms) Thời gian tối đa tính từ thời điểm gần nhất mà giá trị được phát ra.
const subject = new ReplaySubject(3); // lưu trữ 3 giá trị cho các subscriber mới
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(5);
Kết quả sẽ là:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
Hoặc kết hợp cả buffer và windowTime:
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
let i = 1;
const id = setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
}, 1000);
setTimeout(() => {
subject.complete();
clearInterval(id);
}, 2000);
Kết quả sẽ hiển thị:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
Trong ví dụ này, sau 1 giây, chỉ có giá trị 3, 4 và 5 được phát ra trong 500ms gần nhất và nằm trong buffer, do đó nó được phát lại cho observerB.
AsyncSubject
AsyncSubject là một dạng của Subject đặc biệt chỉ phát ra giá trị cuối cùng của quá trình thực thi của Observable khi nó hoàn thành và kết thúc. Nó sẽ phát ra giá trị cuối cùng của nó cho tất cả các Observer khi quá trình hoàn thành.
Nếu Observable không hoàn thành, AsyncSubject sẽ không phát ra bất kỳ giá trị nào.
Nó khá tương tự như Promise, vì chỉ phát ra giá trị sau khi quá trình hoàn thành.
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(5);
subject.complete();
Kết quả sẽ là:
observerA: 5
observerB: 5
Ở đây, giá trị cuối cùng là 5, và nó chỉ được phát ra khi complete()
được gọi. Cả hai observer đều nhận được giá trị này sau khi quá trình hoàn thành.
Subject Completion
Khi một Subject hoàn thành (complete), các Observer mà subscribe sau đó sẽ có hành vi khác nhau tùy thuộc vào loại Subject đó.
Với BehaviorSubject khi hoàn thành, các Observer mới sẽ chỉ nhận được thông báo hoàn thành (complete signal
).
const subject = new BehaviorSubject(0);
subject.subscribe({
next: (v) => console.log("observerA: " + v),
complete: () => console.log("observerA: done"),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
complete: () => console.log("observerB: done"),
});
subject.next(3);
subject.complete();
subject.subscribe({
next: (v) => console.log("observerC: " + v),
complete: () => console.log("observerC: done"),
});
Kết quả:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
observerA: done
observerB: done
observerC: done
Với ReplaySubject khi hoàn thành, các Observer mới sẽ nhận tất cả các giá trị lưu trữ trong buffer
, sau đó mới nhận thông báo hoàn thành.
const subject = new ReplaySubject(3);
// ... (code phần còn lại giống như trong ví dụ của bạn)
Kết quả:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerA: done
observerB: 3
observerB: 4
observerB: 5
observerB: done
Với AsyncSubject khi hoàn thành, dù đã hoàn thành rồi nhưng các Observer vẫn có thể nhận được giá trị cuối cùng khi nó complete.
const subject = new AsyncSubject();
// ... (code phần còn lại giống như trong ví dụ của bạn)
Kết quả:
observerA: 5
observerA: done
observerB: 5
observerB: done
Multicasting
Để multicast (chia sẻ) một observable execution cho nhiều observers, chúng ta có thể sử dụng operator multicast
. Đây là cách để chia sẻ một observable sequence và định nghĩa cách chúng được multicast. Dưới đây là cách thực hiện điều này:
import { interval } from 'rxjs';
import { take, multicast } from 'rxjs/operators';
const observable = interval(500).pipe(take(5));
const subject = new Subject();
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const multicasted = observable.pipe(multicast(subject));
multicasted.subscribe(observerA);
multicasted.connect(); // Bắt đầu kết nối observable
setTimeout(() => {
multicasted.subscribe(observerB);
}, 2000);
Ở đây, chúng ta sử dụng operator multicast
để chia sẻ observable observable
cho subject
, và sau đó chúng ta subscribe các observers vào multicasted
. Bằng cách này, các observers sẽ nhận được giá trị từ cùng một execution của observable thay vì tạo ra các execution độc lập khi mới subscribe. Phương thức connect()
được gọi để bắt đầu kết nối observable và bắt đầu phát ra các giá trị.
multicast
Operator multicast
trong RxJS là một cách để chia sẻ một Observable sequence và định nghĩa cách thức nó được multicast, tức là share cùng một execution cho nhiều observers.
Trong ví dụ dưới đây, chúng ta sử dụng multicast
với một instance của Subject để tạo ra một ConnectableObservable
, sau đó gọi hàm connect()
để kích hoạt Observable.
import { interval, ConnectableObservable, Subject } from 'rxjs';
import { take, multicast } from 'rxjs/operators';
const subject = new Subject();
const connectableObservable = interval(500).pipe(
take(5),
multicast(subject)
) as ConnectableObservable<number>;
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const sub = connectableObservable.subscribe(observerA);
const connectSub = connectableObservable.connect();
setTimeout(() => {
sub.add(connectableObservable.subscribe(observerB));
}, 2000);
setTimeout(() => {
sub.unsubscribe();
connectSub.unsubscribe();
}, 3000);
Chúng ta cần lưu trữ subscription được trả về từ việc gọi connect()
để có thể unsubscribe sau này. Khi chúng ta gọi connect()
, nó tương đương với việc subscribe internal Subject vào Observable như ban đầu: observable.subscribe(subject)
.
Lưu ý rằng, nếu không unsubscribe đúng cách, có thể gây memory leak. Trong ví dụ trên, chúng ta sử dụng sub.add()
để thêm subscription vào một subscription khác và sử dụng sub.unsubscribe()
và connectSub.unsubscribe()
để unsubscribe cả hai. Tuy nhiên, chỉ cần connectSub.unsubscribe()
là đã đủ để unsubscribe internal Subject, nên không cần phải gọi sub.unsubscribe()
cũng được.
refCount
Với refCount
, bạn không cần phải quản lý việc kết nối và ngắt kết nối một cách thủ công. Nó tự động thực hiện kết nối khi có người đăng ký (subscriber) và ngắt kết nối khi không còn người đăng ký nào nữa.
import { interval, ConnectableObservable, Subject } from 'rxjs';
import { tap, multicast, refCount } from 'rxjs/operators';
const subject = new Subject();
const connectableObservable = interval(500).pipe(
tap((x) => console.log("log.info: " + x)),
multicast(subject)
) as ConnectableObservable<number>;
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const observable = connectableObservable.pipe(refCount());
const subA = observable.subscribe(observerA); // Khi có người đăng ký, kết nối được tự động thiết lập từ 0 lên 1
let subB;
setTimeout(() => {
subB = observable.subscribe(observerB); // Khi có người đăng ký thứ hai, kết nối tăng lên từ 1 lên 2
}, 2000);
setTimeout(() => {
subA.unsubscribe(); // Khi một người đăng ký hủy đăng ký, kết nối giảm từ 2 xuống 1
}, 3000);
setTimeout(() => {
subB.unsubscribe(); // Khi không còn ai đăng ký nữa, kết nối giảm từ 1 xuống 0
}, 5000);
Bằng cách sử dụng refCount()
, nó sẽ tự động quản lý số lượng người đăng ký và kết nối tới Observable. Khi có người đăng ký thì số lượng kết nối tăng lên và khi họ hủy đăng ký, số lượng kết nối giảm đi. Điều này giúp loại bỏ việc phải quản lý kết nối thủ công, giảm thiểu rủi ro của việc quên ngắt kết nối khi không cần thiết nữa.
SubjectFactory
Khi một Subject đã hoàn thành (complete), nó không thể nhận thêm bất kỳ giá trị mới nào. Điều này có nghĩa là sau khi Subject kết thúc, không thể thực hiện thêm bất kỳ lệnh nào khác theo sau.
Đoạn code dưới đây tạo một connectableObservable
từ một chuỗi các số tăng dần mỗi 500 milliseconds trong khoảng thời gian nhất định và sau đó chia sẻ dữ liệu này đến các subscription khác nhau.
const connectableObservable = interval(500).pipe(
take(10),
tap((x) => console.log("log.info: " + x)),
multicast(new Subject())
) as ConnectableObservable<number>;
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA);
let subB;
setTimeout(() => {
subB = sharedObservable.subscribe(observerB);
}, 2000);
setTimeout(() => {
const subA2 = sharedObservable.subscribe(observerA);
}, 6000);
Sau khi sharedObservable
kết thúc sau 5 giây, nó không thể gửi thêm bất kỳ giá trị nào. Khi chúng ta cố gắng subscribe vào nó ở thời điểm 6 giây, không có giá trị nào được nhận thêm. Để tiếp tục nhận giá trị, chúng ta cần tạo một Subject mới, và đó chính là lúc ta sử dụng SubjectFactory
.
const connectableObservable = interval(500).pipe(
take(10),
tap((x) => console.log("log.info: " + x)),
multicast(() => new Subject())
) as ConnectableObservable<number>;
// SubjectFactory là một hàm sẽ tạo ra một Subject mới khi được gọi.
// Khi refCount thay đổi từ 0 => 1, nó sẽ được gọi và bắt đầu chạy.
const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA);
let subB;
setTimeout(() => {
subB = sharedObservable.subscribe(observerB);
}, 2000);
setTimeout(() => {
const subA2 = sharedObservable.subscribe(observerA);
}, 6000);
publish
Khi ta dùng multicast(new Subject())
, có thể viết gọn lại bằng cách sử dụng publish
. publish
chuyển một Observable lạnh thành nóng, tức là nó chỉ bắt đầu phát ra các giá trị khi phương thức connect của nó được gọi.
Dưới đây là một ví dụ sử dụng publish
:
const connectableObservable = interval(500).pipe(
tap((x) => console.log("log.info: " + x)),
publish()
) as ConnectableObservable<number>;
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA); // ref từ 0 => 1
let subB;
setTimeout(() => {
subB = sharedObservable.subscribe(observerB); // ref từ 1 => 2
}, 2000);
setTimeout(() => {
subA.unsubscribe(); // ref từ 2 => 1
}, 3000);
setTimeout(() => {
subB.unsubscribe(); // ref từ 1 => 0
}, 5000);
Hơn nữa, publish
cũng có các biến thể tương ứng với các loại Subject khác như:
BehaviorSubject
=>publishBehavior
ReplaySubject
=>publishReplay
AsyncSubject
=>publishLast
Các biến thể này tương ứng với việc chuyển đổi các loại Subject tương ứng sang dạng hot Observable.
share
Khi sử dụng multicast(() => new Subject())
cùng refCount
, thường thì có một cách tiếp cận ngắn gọn hơn là sử dụng share
.
Dưới đây là một ví dụ sử dụng share
:
const sharedObservable = interval(500).pipe(
tap((x) => console.log("log.info: " + x)),
share()
);
const observerA = {
next: (val) => console.log(`Observer A: ${val}`),
error: (err) => console.log(`Observer A Error: ${err}`),
complete: () => console.log(`Observer A complete`),
};
const observerB = {
next: (val) => console.log(`Observer B: ${val}`),
error: (err) => console.log(`Observer B Error: ${err}`),
complete: () => console.log(`Observer B complete`),
};
const subA = sharedObservable.subscribe(observerA); // ref từ 0 => 1
let subB;
setTimeout(() => {
subB = sharedObservable.subscribe(observerB); // ref từ 1 => 2
}, 2000);
setTimeout(() => {
subA.unsubscribe(); // ref từ 2 => 1
}, 3000);
setTimeout(() => {
subB.unsubscribe(); // ref từ 1 => 0
}, 5000);
share
giúp tạo ra một Observable mới, nhận dữ liệu từ Observable gốc và chia sẻ nó đi. Khi có ít nhất một Subscriber, Observable này sẽ được kích hoạt và phát ra dữ liệu. Khi tất cả các Subscriber đã hủy đăng ký, nó sẽ ngừng theo dõi từ Observable nguồn. Việc sử dụng share
làm cho luồng trở nên “nóng” (hot), có nghĩa là khi có Subscriber, dữ liệu sẽ được phát ngay lập tức. Đây là một cách ngắn gọn hơn để thực hiện việc tạo một Observable chia sẻ với các Subscriber.
shareReplay
shareReplay
là một cách để chia sẻ nguồn dữ liệu và lưu trữ một số lượng xác định của các giá trị đã phát ra khi có subscription. Khi bạn subscribe vào Observable này, bạn không chỉ nhận được dữ liệu hiện tại mà còn các giá trị gần đây nhất. Điều này giúp tránh việc mất dữ liệu quan trọng khi có subscription sau. Nếu nguồn dữ liệu hoàn thành thành công, shareReplayed
sẽ lưu trữ dữ liệu đó mãi mãi. Tuy nhiên, nếu có lỗi xảy ra từ nguồn dữ liệu, bạn vẫn có thể thử lại việc lấy dữ liệu từ nguồn đó.
const sharedReplayedObservable = sourceObservable.pipe(
shareReplay({ bufferSize: 3, windowTime: 5000 })
);
// Làm thế nào để hiểu shareReplay?
// Đơn giản, shareReplay giúp chia sẻ dữ liệu từ nguồn và lưu lại một số lượng cụ thể
// của các giá trị đã phát ra khi có subscription.
// Khi bạn subscribe, bạn nhận được không chỉ dữ liệu hiện tại mà còn các giá trị gần đây nhất.
// Điều này giúp tránh việc mất dữ liệu quan trọng khi có subscription muộn.
// Nếu nguồn phát ra dữ liệu hoàn thành thành công, shareReplayed sẽ lưu trữ dữ liệu đó mãi mãi.
// Nhưng nếu có lỗi xảy ra từ nguồn, bạn vẫn có thể thử lại việc lấy dữ liệu từ nguồn đó.
Why use shareReplay?
shareReplay
thường được sử dụng khi bạn có các tác động phụ (side-effects) hoặc tính toán tốn kém mà bạn không muốn thực hiện lại cho nhiều subscriber. Nó cũng có giá trị khi bạn biết rằng bạn sẽ có các subscriber đến sau vào một luồng (stream) cần truy cập vào các giá trị đã phát ra trước đó. Khả năng phát lại (replay) các giá trị khi có subscription là điều làm nên sự khác biệt giữa share
và shareReplay
.
Ví dụ một trường hợp sử dụng phổ biến là để làm cache như sau:
import { Injectable } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import { timer } from "rxjs/observable/timer";
import { switchMap, shareReplay, map, takeUntil } from "rxjs/operators";
export interface Joke {
id: number;
joke: string;
categories: Array<string>;
}
export interface JokeResponse {
type: string;
value: Array<Joke>;
}
const API_ENDPOINT = "https://api.icndb.com/jokes/random/5?limitTo=[nerdy]";
const REFRESH_INTERVAL = 10000;
const CACHE_SIZE = 1;
@Injectable()
export class JokeService {
private cache$: Observable<Array<Joke>>;
private reload$ = new Subject<void>();
constructor(private http: HttpClient) {}
// Phương thức này chịu trách nhiệm cho việc lấy dữ liệu.
// Người đầu tiên gọi hàm này sẽ khởi tạo
// quá trình lấy dữ liệu.
get jokes() {
if (!this.cache$) {
// Thiết lập timer chạy mỗi X mili giây
const timer$ = timer(0, REFRESH_INTERVAL);
// Mỗi khi timer chạy, thực hiện request HTTP để lấy dữ liệu mới
// Sử dụng shareReplay(X) để chia sẻ cache giữa các subscriber
// và không tạo lại nguồn dữ liệu liên tục. Sử dụng takeUntil
// để hoàn thành luồng này khi người dùng yêu cầu cập nhật.
this.cache$ = timer$.pipe(
switchMap(() => this.requestJokes()),
takeUntil(this.reload$),
shareReplay(CACHE_SIZE)
);
}
return this.cache$;
}
// API công khai để bắt buộc cache tải lại dữ liệu
forceReload() {
this.reload$.next();
this.cache$ = null;
}
// Phương thức hỗ trợ thực hiện request để lấy các câu chuyện hài hước
private requestJokes() {
return this.http
.get<JokeResponse>(API_ENDPOINT)
.pipe(map((response) => response.value));
}
}
Trong ví dụ này, shareReplay
được sử dụng để lưu trữ và chia sẻ dữ liệu về các câu chuyện hài hước từ một API. Khi có yêu cầu, Observable jokes
sẽ lấy dữ liệu mới từ API và cache lại. Khi có các subscriber đến sau, chúng có thể truy cập vào các giá trị đã phát ra trước đó mà không cần thực hiện lại việc lấy dữ liệu từ API. Phương thức forceReload
được cung cấp để yêu cầu cache tải lại dữ liệu mới từ nguồn.