hocvietcode.com
  • Trang chủ
  • Học lập trình
    • Lập trình C/C++
    • Lập trình HTML
    • Lập trình Javascript
      • Javascript cơ bản
      • ReactJS framework
      • AngularJS framework
      • Typescript cơ bản
      • Angular
    • Lập trình Mobile
      • Lập Trình Dart Cơ Bản
        • Dart Flutter Framework
    • Cơ sở dữ liệu
      • MySQL – MariaDB
      • Micrsoft SQL Server
      • Extensible Markup Language (XML)
      • JSON
    • Lập trình PHP
      • Lập trình PHP cơ bản
      • Laravel Framework
    • Lập trình Java
      • Java Cơ bản
    • Cấu trúc dữ liệu và giải thuật
    • Lập Trình C# Cơ Bản
    • Machine Learning
  • WORDPRESS
    • WordPress cơ bản
    • WordPress nâng cao
    • Chia sẻ WordPress
  • Kiến thức hệ thống
    • Microsoft Azure
    • Docker
    • Linux
  • Chia sẻ IT
    • Tin học văn phòng
      • Microsoft Word
      • Microsoft Excel
    • Marketing
      • Google Adwords
      • Facebook Ads
      • Kiến thức khác
    • Chia sẻ phần mềm
    • Review công nghệ
    • Công cụ – tiện ích
      • Kiểm tra bàn phím online
      • Kiểm tra webcam online
Đăng nhập
  • Đăng nhập / Đăng ký

Please enter key search to display results.

Home
  • Angular
  • Học lập trình
  • Lập trình Javascript
RxJS Subject và Multicasting trong RxJS

RxJS Subject và Multicasting trong RxJS

  • 06-01-2024
  • Toanngo92
  • 0 Comments

Mục lục

  • Observable Execution
  • Subject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject
  • Subject Completion
  • Multicasting
  • multicast
  • refCount
  • SubjectFactory
  • publish
  • share
  • shareReplay
  • Why use shareReplay?

Observable Execution

Thông thường, mỗi khi bạn subscribe vào một Observable, nó tạo ra một execution mới. Ví dụ:

const observable = interval(500).pipe(take(5));
const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};
observable.subscribe(observerA);

// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer A complete

Nếu sau đó bạn subscribe một observer khác sau một khoảng thời gian, bạn sẽ thấy rằng các execution là hoàn toàn riêng biệt:

const observable = interval(500).pipe(take(5));
const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};
observable.subscribe(observerA);

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

setTimeout(() => {
  observable.subscribe(observerB);
}, 2000);

// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer A complete
// Observer B: 0
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete

Nhưng có cách nào để chia sẻ một execution giữa các observer không? RxJS sử dụng một design pattern gọi là Observer Pattern.

Chúng ta có thể tạo một hybrid observer, cho phép chúng ta chia sẻ execution giữa các observers:

const hybridObserver = {
  observers: [],
  registerObserver(observer) {
    this.observers.push(observer);
  },
  next(value) {
    this.observers.forEach(observer => observer.next(value));
  },
  error(err) {
    this.observers.forEach(observer => observer.error(err));
  },
  complete() {
    this.observers.forEach(observer => observer.complete());
  }
};

hybridObserver.registerObserver(observerA);
observable.subscribe(hybridObserver);

setTimeout(() => {
  hybridObserver.registerObserver(observerB);
}, 2000);

// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer B: 0
// Observer A complete
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete

Một cách khác tương tự, chúng ta có thể sử dụng Subject, một loại đặc biệt của Observable, để chia sẻ execution giữa các observers:

const subject = new Subject();

subject.subscribe(observerA);
observable.subscribe(subject);

setTimeout(() => {
  subject.subscribe(observerB);
}, 2000);

// Output:
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: 3
// Observer A: 4
// Observer B: 0
// Observer A complete
// Observer B: 1
// Observer B: 2
// Observer B: 3
// Observer B: 4
// Observer B complete

Sử dụng Subject cho phép chúng ta chuyển từ unicast Observable execution sang multicast, cho phép các observers chia sẻ cùng một execution.

Subject

Chúng ta có thứ gọi là Subject trong RxJS. Điều đặc biệt là nó không chỉ là một Observable mà còn là một Observer nữa. Điều này có nghĩa là ta có thể đăng ký để nhận thông tin từ nó cũng như gửi thông tin tới nó.

Ví dụ, giả sử chúng ta đang xây dựng một tính năng tìm kiếm realtime. Ta có thể sử dụng Subject như một “Event Bus” để quản lý thông tin tìm kiếm.

import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { throttleTime, distinctUntilChanged } from 'rxjs/operators';

@Component({
  selector: "my-app",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"],
})
export class AppComponent implements OnInit {
  searchTerm$ = new Subject<string>();

  ngOnInit() {
    this.searchTerm$
      .pipe(
        throttleTime(250, undefined, {
          leading: true,
          trailing: true,
        }),
        distinctUntilChanged()
      )
      .subscribe({
        next: (value) => console.log(value),
      });
  }

  onInput(event: Event) {
    const target = event.target as HTMLInputElement;
    this.searchTerm$.next(target.value);
  }
}

Ở đoạn code trên, searchTerm$ là một Subject. Khi ta gửi thông điệp tìm kiếm mới thông qua onInput, nó sẽ gửi thông điệp đó tới searchTerm$.

Việc thú vị là ta có thể áp dụng các toán tử từ RxJS như throttleTime hoặc distinctUntilChanged để kiểm soát luồng thông tin tới searchTerm$, giúp quản lý thông tin tìm kiếm theo cách mà ta muốn.

BehaviorSubject

Khi sử dụng Subject, một vấn đề thường gặp là khi có người subscribe muộn, họ sẽ không nhận được các giá trị đã được phát ra trong quá khứ mà chỉ nhận được các giá trị mới sau khi họ đăng ký.

Hãy xem xét ví dụ sau:

import { Subject, BehaviorSubject } from 'rxjs';

const subject = new Subject();
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log("observerB: " + v),
});

subject.next(3);

Kết quả hiển thị sẽ là:

observerA: 1
observerA: 2
observerA: 3
observerB: 3

Để giải quyết vấn đề này, chúng ta có một biến thể của Subject gọi là BehaviorSubject. Điểm đặc biệt của nó là việc lưu trữ giá trị hiện tại, và khi một Observer mới subscribe vào, nó sẽ tự động phát ra giá trị hiện tại đó cho Observer đó.

BehaviorSubject thường được sử dụng để biểu diễn “giá trị qua thời gian”. Ví dụ, một luồng sự kiện về ngày sinh là một Subject, trong khi luồng về tuổi của một người sẽ là BehaviorSubject.

Ví dụ, bạn có thể sử dụng BehaviorSubject để chia sẻ thông tin về người dùng hiện tại đang đăng nhập trong hệ thống với các component khác nhau trong Angular.

Lưu ý quan trọng: BehaviorSubject yêu cầu phải có giá trị khởi tạo khi bạn tạo ra subject.

const subject = new BehaviorSubject(0); // 0 là giá trị khởi tạo
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log("observerB: " + v),
});

subject.next(3);

Kết quả khi chạy đoạn mã trên sẽ là:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

Như vậy, nhờ BehaviorSubject, ObserverB đã nhận được giá trị hiện tại (là 2) ngay khi nó đăng ký, còn ObserverA cũng nhận được giá trị khởi tạo và tất cả các giá trị mới được phát ra.

ReplaySubject

ReplaySubject là một dạng của Subject trong đó có khả năng “phát lại” hoặc gửi những giá trị cũ cho các Observer mới đăng ký. Nó giữ một bộ nhớ đệm với số lượng giá trị cụ thể và sẽ phát ra những giá trị đó ngay lập tức cho bất kỳ Observer mới nào, cùng với việc phát ra các giá trị mới cho các Observer hiện tại.

Khi tạo một ReplaySubject, bạn có thể chỉ định các tham số sau:

  • buffer: Số lượng giá trị tối đa mà nó có thể lưu trữ.
  • windowTime: (ms) Thời gian tối đa tính từ thời điểm gần nhất mà giá trị được phát ra.
const subject = new ReplaySubject(3); // lưu trữ 3 giá trị cho các subscriber mới
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
  next: (v) => console.log("observerB: " + v),
});
subject.next(5);

Kết quả sẽ là:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

Hoặc kết hợp cả buffer và windowTime:

const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
});
let i = 1;
const id = setInterval(() => subject.next(i++), 200);
setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log("observerB: " + v),
  });
}, 1000);
setTimeout(() => {
  subject.complete();
  clearInterval(id);
}, 2000);

Kết quả sẽ hiển thị:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

Trong ví dụ này, sau 1 giây, chỉ có giá trị 3, 4 và 5 được phát ra trong 500ms gần nhất và nằm trong buffer, do đó nó được phát lại cho observerB.

AsyncSubject


AsyncSubject là một dạng của Subject đặc biệt chỉ phát ra giá trị cuối cùng của quá trình thực thi của Observable khi nó hoàn thành và kết thúc. Nó sẽ phát ra giá trị cuối cùng của nó cho tất cả các Observer khi quá trình hoàn thành.

Nếu Observable không hoàn thành, AsyncSubject sẽ không phát ra bất kỳ giá trị nào.

Nó khá tương tự như Promise, vì chỉ phát ra giá trị sau khi quá trình hoàn thành.

const subject = new AsyncSubject();
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
  next: (v) => console.log("observerB: " + v),
});
subject.next(5);
subject.complete();

Kết quả sẽ là:

observerA: 5
observerB: 5

Ở đây, giá trị cuối cùng là 5, và nó chỉ được phát ra khi complete() được gọi. Cả hai observer đều nhận được giá trị này sau khi quá trình hoàn thành.

Subject Completion

Khi một Subject hoàn thành (complete), các Observer mà subscribe sau đó sẽ có hành vi khác nhau tùy thuộc vào loại Subject đó.

Với BehaviorSubject khi hoàn thành, các Observer mới sẽ chỉ nhận được thông báo hoàn thành (complete signal).

const subject = new BehaviorSubject(0);
subject.subscribe({
  next: (v) => console.log("observerA: " + v),
  complete: () => console.log("observerA: done"),
});
subject.next(1);
subject.next(2);
subject.subscribe({
  next: (v) => console.log("observerB: " + v),
  complete: () => console.log("observerB: done"),
});
subject.next(3);
subject.complete();
subject.subscribe({
  next: (v) => console.log("observerC: " + v),
  complete: () => console.log("observerC: done"),
});

Kết quả:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
observerA: done
observerB: done
observerC: done

Với ReplaySubject khi hoàn thành, các Observer mới sẽ nhận tất cả các giá trị lưu trữ trong buffer, sau đó mới nhận thông báo hoàn thành.

const subject = new ReplaySubject(3);
// ... (code phần còn lại giống như trong ví dụ của bạn)

Kết quả:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerA: done
observerB: 3
observerB: 4
observerB: 5
observerB: done

Với AsyncSubject khi hoàn thành, dù đã hoàn thành rồi nhưng các Observer vẫn có thể nhận được giá trị cuối cùng khi nó complete.

const subject = new AsyncSubject();
// ... (code phần còn lại giống như trong ví dụ của bạn)

Kết quả:

observerA: 5
observerA: done
observerB: 5
observerB: done

Multicasting


Để multicast (chia sẻ) một observable execution cho nhiều observers, chúng ta có thể sử dụng operator multicast. Đây là cách để chia sẻ một observable sequence và định nghĩa cách chúng được multicast. Dưới đây là cách thực hiện điều này:

import { interval } from 'rxjs';
import { take, multicast } from 'rxjs/operators';

const observable = interval(500).pipe(take(5));
const subject = new Subject();

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const multicasted = observable.pipe(multicast(subject));

multicasted.subscribe(observerA);
multicasted.connect(); // Bắt đầu kết nối observable

setTimeout(() => {
  multicasted.subscribe(observerB);
}, 2000);

Ở đây, chúng ta sử dụng operator multicast để chia sẻ observable observable cho subject, và sau đó chúng ta subscribe các observers vào multicasted. Bằng cách này, các observers sẽ nhận được giá trị từ cùng một execution của observable thay vì tạo ra các execution độc lập khi mới subscribe. Phương thức connect() được gọi để bắt đầu kết nối observable và bắt đầu phát ra các giá trị.

multicast

Operator multicast trong RxJS là một cách để chia sẻ một Observable sequence và định nghĩa cách thức nó được multicast, tức là share cùng một execution cho nhiều observers.

Trong ví dụ dưới đây, chúng ta sử dụng multicast với một instance của Subject để tạo ra một ConnectableObservable, sau đó gọi hàm connect() để kích hoạt Observable.

import { interval, ConnectableObservable, Subject } from 'rxjs';
import { take, multicast } from 'rxjs/operators';

const subject = new Subject();
const connectableObservable = interval(500).pipe(
  take(5),
  multicast(subject)
) as ConnectableObservable<number>;

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const sub = connectableObservable.subscribe(observerA);
const connectSub = connectableObservable.connect();

setTimeout(() => {
  sub.add(connectableObservable.subscribe(observerB));
}, 2000);

setTimeout(() => {
  sub.unsubscribe();
  connectSub.unsubscribe();
}, 3000);

Chúng ta cần lưu trữ subscription được trả về từ việc gọi connect() để có thể unsubscribe sau này. Khi chúng ta gọi connect(), nó tương đương với việc subscribe internal Subject vào Observable như ban đầu: observable.subscribe(subject).

Lưu ý rằng, nếu không unsubscribe đúng cách, có thể gây memory leak. Trong ví dụ trên, chúng ta sử dụng sub.add() để thêm subscription vào một subscription khác và sử dụng sub.unsubscribe() và connectSub.unsubscribe() để unsubscribe cả hai. Tuy nhiên, chỉ cần connectSub.unsubscribe() là đã đủ để unsubscribe internal Subject, nên không cần phải gọi sub.unsubscribe() cũng được.

refCount

Với refCount, bạn không cần phải quản lý việc kết nối và ngắt kết nối một cách thủ công. Nó tự động thực hiện kết nối khi có người đăng ký (subscriber) và ngắt kết nối khi không còn người đăng ký nào nữa.

import { interval, ConnectableObservable, Subject } from 'rxjs';
import { tap, multicast, refCount } from 'rxjs/operators';

const subject = new Subject();
const connectableObservable = interval(500).pipe(
  tap((x) => console.log("log.info: " + x)),
  multicast(subject)
) as ConnectableObservable<number>;

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const observable = connectableObservable.pipe(refCount());
const subA = observable.subscribe(observerA); // Khi có người đăng ký, kết nối được tự động thiết lập từ 0 lên 1
let subB;
setTimeout(() => {
  subB = observable.subscribe(observerB); // Khi có người đăng ký thứ hai, kết nối tăng lên từ 1 lên 2
}, 2000);
setTimeout(() => {
  subA.unsubscribe(); // Khi một người đăng ký hủy đăng ký, kết nối giảm từ 2 xuống 1
}, 3000);
setTimeout(() => {
  subB.unsubscribe(); // Khi không còn ai đăng ký nữa, kết nối giảm từ 1 xuống 0
}, 5000);

Bằng cách sử dụng refCount(), nó sẽ tự động quản lý số lượng người đăng ký và kết nối tới Observable. Khi có người đăng ký thì số lượng kết nối tăng lên và khi họ hủy đăng ký, số lượng kết nối giảm đi. Điều này giúp loại bỏ việc phải quản lý kết nối thủ công, giảm thiểu rủi ro của việc quên ngắt kết nối khi không cần thiết nữa.

SubjectFactory

Khi một Subject đã hoàn thành (complete), nó không thể nhận thêm bất kỳ giá trị mới nào. Điều này có nghĩa là sau khi Subject kết thúc, không thể thực hiện thêm bất kỳ lệnh nào khác theo sau.

Đoạn code dưới đây tạo một connectableObservable từ một chuỗi các số tăng dần mỗi 500 milliseconds trong khoảng thời gian nhất định và sau đó chia sẻ dữ liệu này đến các subscription khác nhau.

const connectableObservable = interval(500).pipe(
  take(10),
  tap((x) => console.log("log.info: " + x)),
  multicast(new Subject())
) as ConnectableObservable<number>;

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA);
let subB;

setTimeout(() => {
  subB = sharedObservable.subscribe(observerB);
}, 2000);

setTimeout(() => {
  const subA2 = sharedObservable.subscribe(observerA);
}, 6000);

Sau khi sharedObservable kết thúc sau 5 giây, nó không thể gửi thêm bất kỳ giá trị nào. Khi chúng ta cố gắng subscribe vào nó ở thời điểm 6 giây, không có giá trị nào được nhận thêm. Để tiếp tục nhận giá trị, chúng ta cần tạo một Subject mới, và đó chính là lúc ta sử dụng SubjectFactory.

const connectableObservable = interval(500).pipe(
  take(10),
  tap((x) => console.log("log.info: " + x)),
  multicast(() => new Subject())
) as ConnectableObservable<number>;

// SubjectFactory là một hàm sẽ tạo ra một Subject mới khi được gọi.
// Khi refCount thay đổi từ 0 => 1, nó sẽ được gọi và bắt đầu chạy.
const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA);
let subB;

setTimeout(() => {
  subB = sharedObservable.subscribe(observerB);
}, 2000);

setTimeout(() => {
  const subA2 = sharedObservable.subscribe(observerA);
}, 6000);

publish


Khi ta dùng multicast(new Subject()), có thể viết gọn lại bằng cách sử dụng publish. publish chuyển một Observable lạnh thành nóng, tức là nó chỉ bắt đầu phát ra các giá trị khi phương thức connect của nó được gọi.

Dưới đây là một ví dụ sử dụng publish:

const connectableObservable = interval(500).pipe(
  tap((x) => console.log("log.info: " + x)),
  publish()
) as ConnectableObservable<number>;

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const sharedObservable = connectableObservable.refCount();
const subA = sharedObservable.subscribe(observerA); // ref từ 0 => 1

let subB;
setTimeout(() => {
  subB = sharedObservable.subscribe(observerB); // ref từ 1 => 2
}, 2000);

setTimeout(() => {
  subA.unsubscribe(); // ref từ 2 => 1
}, 3000);

setTimeout(() => {
  subB.unsubscribe(); // ref từ 1 => 0
}, 5000);

Hơn nữa, publish cũng có các biến thể tương ứng với các loại Subject khác như:

  • BehaviorSubject => publishBehavior
  • ReplaySubject => publishReplay
  • AsyncSubject => publishLast

Các biến thể này tương ứng với việc chuyển đổi các loại Subject tương ứng sang dạng hot Observable.

share

Khi sử dụng multicast(() => new Subject()) cùng refCount, thường thì có một cách tiếp cận ngắn gọn hơn là sử dụng share.

Dưới đây là một ví dụ sử dụng share:

const sharedObservable = interval(500).pipe(
  tap((x) => console.log("log.info: " + x)),
  share()
);

const observerA = {
  next: (val) => console.log(`Observer A: ${val}`),
  error: (err) => console.log(`Observer A Error: ${err}`),
  complete: () => console.log(`Observer A complete`),
};

const observerB = {
  next: (val) => console.log(`Observer B: ${val}`),
  error: (err) => console.log(`Observer B Error: ${err}`),
  complete: () => console.log(`Observer B complete`),
};

const subA = sharedObservable.subscribe(observerA); // ref từ 0 => 1

let subB;
setTimeout(() => {
  subB = sharedObservable.subscribe(observerB); // ref từ 1 => 2
}, 2000);

setTimeout(() => {
  subA.unsubscribe(); // ref từ 2 => 1
}, 3000);

setTimeout(() => {
  subB.unsubscribe(); // ref từ 1 => 0
}, 5000);

share giúp tạo ra một Observable mới, nhận dữ liệu từ Observable gốc và chia sẻ nó đi. Khi có ít nhất một Subscriber, Observable này sẽ được kích hoạt và phát ra dữ liệu. Khi tất cả các Subscriber đã hủy đăng ký, nó sẽ ngừng theo dõi từ Observable nguồn. Việc sử dụng share làm cho luồng trở nên “nóng” (hot), có nghĩa là khi có Subscriber, dữ liệu sẽ được phát ngay lập tức. Đây là một cách ngắn gọn hơn để thực hiện việc tạo một Observable chia sẻ với các Subscriber.

shareReplay

shareReplay là một cách để chia sẻ nguồn dữ liệu và lưu trữ một số lượng xác định của các giá trị đã phát ra khi có subscription. Khi bạn subscribe vào Observable này, bạn không chỉ nhận được dữ liệu hiện tại mà còn các giá trị gần đây nhất. Điều này giúp tránh việc mất dữ liệu quan trọng khi có subscription sau. Nếu nguồn dữ liệu hoàn thành thành công, shareReplayed sẽ lưu trữ dữ liệu đó mãi mãi. Tuy nhiên, nếu có lỗi xảy ra từ nguồn dữ liệu, bạn vẫn có thể thử lại việc lấy dữ liệu từ nguồn đó.

const sharedReplayedObservable = sourceObservable.pipe(
  shareReplay({ bufferSize: 3, windowTime: 5000 })
);

// Làm thế nào để hiểu shareReplay?
// Đơn giản, shareReplay giúp chia sẻ dữ liệu từ nguồn và lưu lại một số lượng cụ thể
// của các giá trị đã phát ra khi có subscription.

// Khi bạn subscribe, bạn nhận được không chỉ dữ liệu hiện tại mà còn các giá trị gần đây nhất.
// Điều này giúp tránh việc mất dữ liệu quan trọng khi có subscription muộn.

// Nếu nguồn phát ra dữ liệu hoàn thành thành công, shareReplayed sẽ lưu trữ dữ liệu đó mãi mãi.
// Nhưng nếu có lỗi xảy ra từ nguồn, bạn vẫn có thể thử lại việc lấy dữ liệu từ nguồn đó.

Why use shareReplay?

shareReplay thường được sử dụng khi bạn có các tác động phụ (side-effects) hoặc tính toán tốn kém mà bạn không muốn thực hiện lại cho nhiều subscriber. Nó cũng có giá trị khi bạn biết rằng bạn sẽ có các subscriber đến sau vào một luồng (stream) cần truy cập vào các giá trị đã phát ra trước đó. Khả năng phát lại (replay) các giá trị khi có subscription là điều làm nên sự khác biệt giữa share và shareReplay.

Ví dụ một trường hợp sử dụng phổ biến là để làm cache như sau:

import { Injectable } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import { timer } from "rxjs/observable/timer";
import { switchMap, shareReplay, map, takeUntil } from "rxjs/operators";

export interface Joke {
  id: number;
  joke: string;
  categories: Array<string>;
}

export interface JokeResponse {
  type: string;
  value: Array<Joke>;
}

const API_ENDPOINT = "https://api.icndb.com/jokes/random/5?limitTo=[nerdy]";
const REFRESH_INTERVAL = 10000;
const CACHE_SIZE = 1;

@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;
  private reload$ = new Subject<void>();

  constructor(private http: HttpClient) {}

  // Phương thức này chịu trách nhiệm cho việc lấy dữ liệu.
  // Người đầu tiên gọi hàm này sẽ khởi tạo
  // quá trình lấy dữ liệu.
  get jokes() {
    if (!this.cache$) {
      // Thiết lập timer chạy mỗi X mili giây
      const timer$ = timer(0, REFRESH_INTERVAL);
      // Mỗi khi timer chạy, thực hiện request HTTP để lấy dữ liệu mới
      // Sử dụng shareReplay(X) để chia sẻ cache giữa các subscriber
      // và không tạo lại nguồn dữ liệu liên tục. Sử dụng takeUntil
      // để hoàn thành luồng này khi người dùng yêu cầu cập nhật.
      this.cache$ = timer$.pipe(
        switchMap(() => this.requestJokes()),
        takeUntil(this.reload$),
        shareReplay(CACHE_SIZE)
      );
    }
    return this.cache$;
  }

  // API công khai để bắt buộc cache tải lại dữ liệu
  forceReload() {
    this.reload$.next();
    this.cache$ = null;
  }

  // Phương thức hỗ trợ thực hiện request để lấy các câu chuyện hài hước
  private requestJokes() {
    return this.http
      .get<JokeResponse>(API_ENDPOINT)
      .pipe(map((response) => response.value));
  }
}

Trong ví dụ này, shareReplay được sử dụng để lưu trữ và chia sẻ dữ liệu về các câu chuyện hài hước từ một API. Khi có yêu cầu, Observable jokes sẽ lấy dữ liệu mới từ API và cache lại. Khi có các subscriber đến sau, chúng có thể truy cập vào các giá trị đã phát ra trước đó mà không cần thực hiện lại việc lấy dữ liệu từ API. Phương thức forceReload được cung cấp để yêu cầu cache tải lại dữ liệu mới từ nguồn.

Bài viết liên quan:

Sắp xếp sủi bọt – Bubble Sort
TypeScript với Kiểu Dữ Liệu Cơ Bản – 3
TypeScript với Kiểu Dữ Liệu Cơ Bản – 2
TypeScript với Kiểu Dữ Liệu Cơ Bản – 1
Typescript cơ bản và cách cài đặt cho người mới
Thực Hành Micro Frontends
Dynamic Component trong Angular
Async Validator trong Angular Form
Reactive Forms Trong Angular (Phần 2)
Reactive Forms Trong Angular (Phần 1)
Template-driven Forms Trong Angular (Phần 2)
Template-driven Forms Trong Angular (Phần 1)

THÊM BÌNH LUẬN Cancel reply

Dịch vụ thiết kế Wesbite

NỘI DUNG MỚI CẬP NHẬT

2. PHÂN TÍCH VÀ ĐẶC TẢ HỆ THỐNG

1. TỔNG QUAN KIẾN THỨC THỰC HÀNH TRIỂN KHAI DỰ ÁN CÔNG NGHỆ THÔNG TIN

Hướng dẫn tự cài đặt n8n comunity trên CyberPanel, trỏ tên miền

Mẫu prompt tạo mô tả chi tiết bối cảnh

Một số cải tiến trong ASP.NET Core, Razor Page, Model Binding, Gabbage collection

Giới thiệu

hocvietcode.com là website chia sẻ và cập nhật tin tức công nghệ, chia sẻ kiến thức, kỹ năng. Chúng tôi rất cảm ơn và mong muốn nhận được nhiều phản hồi để có thể phục vụ quý bạn đọc tốt hơn !

Liên hệ quảng cáo: [email protected]

Kết nối với HỌC VIẾT CODE

© hocvietcode.com - Tech888 Co .Ltd since 2019

Đăng nhập

Trở thành một phần của cộng đồng của chúng tôi!
Registration complete. Please check your email.
Đăng nhập bằng google
Đăng kýBạn quên mật khẩu?

Create an account

Welcome! Register for an account
The user name or email address is not correct.
Registration confirmation will be emailed to you.
Log in Lost your password?

Reset password

Recover your password
Password reset email has been sent.
The email could not be sent. Possible reason: your host may have disabled the mail function.
A password will be e-mailed to you.
Log in Register
×