Giới thiệu Reactive Programming, RxJS và Observable
- 26-12-2023
- Toanngo92
- 0 Comments
Mục lục
Observable trong Angular
Trong lập trình thông thường, chúng ta thường sử dụng Array để lưu trữ nhiều phần tử. Ví dụ như trong JavaScript, chúng ta có thể sử dụng các phương thức như map, filter, reduce, every, some để thao tác với Array một cách linh hoạt.
Tuy nhiên, trong thế giới lập trình bất đồng bộ, chúng ta cần xử lý các sự kiện không đồng bộ như Event hoặc Promise. Những sự kiện này có thể xảy ra bất cứ lúc nào trong tương lai mà ta không thể biết trước. Chúng ta có thể gửi lời nhắc và khi có sự kiện xảy ra, chương trình sẽ gọi lại lời nhắc đó (callback) cho chúng ta.
Observable, một khái niệm trong lập trình reactive, vượt trội không chỉ vì nó xử lý được nhiều giá trị bất đồng bộ mà còn vì cách tiếp cận kiến trúc của nó. Observable coi mọi thứ như là các dòng dữ liệu, cho phép dữ liệu chảy qua các luồng xử lý để biến đổi thành dạng mà chúng ta mong muốn.
Observable có thể được coi là một loạt các giá trị theo thời gian:
- Giá trị qua thời gian
Hiện tại, Observable chưa được chính thức tích hợp vào JavaScript, nhưng bạn có thể sử dụng thư viện RxJS để tạo các thành phần như Observable, Observer, Subject và một loạt các toán tử (operators) đi kèm. Nhờ đó, việc xử lý dòng dữ liệu trở nên dễ dàng hơn.
Use-case throttle
Thường khi có các sự kiện xảy ra quá nhanh và nhiều, ta muốn giữ lại chỉ những giá trị quan trọng và bỏ qua những giá trị trung gian. Ví dụ, bạn có một nút trong ứng dụng và muốn tránh việc người dùng click liên tục vào nút đó quá nhanh, chẳng hạn 500ms. Nếu họ click quá nhanh, ta chỉ muốn tương tác với lần click gần nhất và bỏ qua những click trước đó trong khoảng thời gian đó.
Bây giờ chúng ta sẽ thử cách tiếp cận thông thường và sử dụng RxJS để giải quyết vấn đề trên. Hãy xem xét một số đoạn mã RxJS dưới đây:
import { fromEvent } from "rxjs";
import { throttleTime, scan } from "rxjs/operators";
const btnrxjsThrottle = document.querySelector("#rxjsThrottle");
fromEvent(btnrxjsThrottle, "click")
.pipe(
throttleTime(500),
scan((count) => count + 1, 0)
)
.subscribe((count) => console.log(`RxJS: Clicked ${count} times`));
Với cách sử dụng RxJS trong bài toán trên, ta có thể linh hoạt xử lý và biến đổi dữ liệu một cách dễ dàng hơn, nếu cần thiết.
RxJS core concepts trong Angular
Observable trong Angular
Observable trong Angular là một khái niệm quan trọng, nó tượng trưng cho một bộ sưu tập các giá trị hoặc sự kiện mà sẽ xảy ra trong tương lai. Khi có các giá trị hoặc sự kiện này xuất hiện, Observable sẽ chuyển chúng đến người quan sát (Observer).
Một Observable đơn giản chỉ là một hàm (hoặc class) có một số đặc điểm đặc biệt. Nó nhận một hàm làm đối số, hàm này nhận một người quan sát và trả về một hàm để hủy bỏ việc theo dõi (unsubscribe). Thông thường, chúng ta đặt tên cho hàm này là “unsubscribe” (từ RxJS 5 trở lên).
Observer
Observer là một tập hợp các callback tương ứng với việc nghe và xử lý các giá trị (như next, error, hoặc complete) được gửi từ Observable.
Subscription
Subscription là kết quả thu được sau khi chúng ta thực hiện một Observable. Thường được sử dụng để hủy bỏ việc theo dõi.
Operators
Operators là những hàm thuần túy cho phép chúng ta lập trình theo kiểu functional với Observable, giúp biến đổi và xử lý dữ liệu dễ dàng.
Subject
Subject là công cụ để gửi dữ liệu đến nhiều người quan sát cùng lúc (multicasting).
Schedulers
Schedulers quyết định khi nào một subscription sẽ bắt đầu thực thi và khi nào nó sẽ gửi tín hiệu đi.
Working with Observables
Creating Observables
Để tạo một Observable, chúng ta có thể sử dụng constructor của nó và truyền vào một hàm (gọi là subscribe). Trong hàm subscribe này, chúng ta sẽ nhận một đối tượng Observer.
const observable = new Observable(function subscribe(observer) {
const id = setTimeout(() => {
observer.next("Hello RxJS");
observer.complete();
}, 1000);
return function unsubscribe() {
clearTimeout(id);
};
});
Hàm subscribe này có thể chứa các công việc chúng ta muốn thực hiện, và để dọn dẹp sau khi kết thúc, chúng ta có thể trả về một hàm unsubscribe từ bên trong hàm subscribe. Hàm này sẽ giúp chúng ta dọn dẹp và ngừng các tác vụ đang diễn ra khi cần thiết.
Invoking Observable
Các Observable thường hoạt động giống như một function, nghĩa là nó chỉ được “kích hoạt” khi bạn gọi nó, giống như việc gọi một hàm (lazy computation).
Để “kích hoạt” một Observable, bạn sử dụng hàm subscribe. Khi bạn subscribe vào nó, Observable sẽ trả về một Subscription.
const subscription = observable.subscribe({
next: (value) => {
console.log(value);
},
error: (error) => {
console.log(error);
},
complete: () => {
console.log("Hoàn thành");
},
});
Ở đoạn code trên, chúng ta đã cung cấp đầy đủ 3 hàm callback cho Observer. Tuy nhiên, bạn có thể không cần cung cấp hoặc chỉ cần một số trong số các callback này tùy theo nhu cầu cụ thể.
Executing Observables
Khi chúng ta tạo một Observable bằng cách sử dụng cú pháp new Observable(function subscribe(observer) {…}), đoạn mã này thường được gọi là “Observable execution”.
Khi chúng ta bắt đầu thực thi một Observable bằng cách gọi hàm subscribe, nó sẽ bắt đầu chạy. Khi một tín hiệu nào đó (next, error, complete) được gửi đi từ Observable, các hàm mà chúng ta cung cấp cho Observer sẽ được gọi để xử lý tín hiệu đó.
Có ba loại tín hiệu mà một Observable Execution có thể gửi đi:
- “Next” notification: Gửi đi một giá trị, có thể là bất kỳ kiểu dữ liệu nào như Number, String, Object, v.v.
- “Error” notification: Gửi đi một lỗi JavaScript hoặc ngoại lệ.
- “Complete” notification: Không gửi đi bất kỳ giá trị nào, nhưng thông báo rằng luồng này đã hoàn tất, cho biết Observer có thể thực hiện một hành động cuối cùng khi luồng hoàn tất.
Thông thường, “Next” notifications được sử dụng phổ biến nhất, vì chúng chứa dữ liệu quan trọng mà Observer cần.
Error và Complete notifications chỉ có thể xảy ra một lần duy nhất trong một Observable Execution.
Chú ý rằng, chỉ có một trong hai loại tín hiệu này được gửi đi, nếu đã hoàn tất thì không có lỗi, nếu có lỗi thì không có hoàn tất. Và sau khi gửi đi tín hiệu hoàn tất hoặc lỗi, không có dữ liệu nào được gửi tiếp theo. Điều này có nghĩa rằng luồng đã kết thúc.
Trong một Observable Execution, có thể gửi đi từ không đến vô hạn tín hiệu “Next”. Nếu đã gửi đi thông báo lỗi hoặc hoàn tất, thì không thể gửi đi bất kỳ tín hiệu nào khác sau đó.
Disposing Observable Executions
Khi bạn sử dụng Observable để thực hiện một quá trình có thể lặp lại hoặc trong trường hợp bạn muốn ngừng việc thực thi vì dữ liệu không còn cần thiết nữa – ví dụ như đóng kết nối websocket hoặc bỏ đi các sự kiện từ một phần tử DOM không còn tồn tại nữa.
Observable cung cấp cơ chế để bạn có thể hủy việc thực thi. Khi bạn gọi hàm subscribe, một Observer sẽ được kết nối với quá trình thực thi mới của Observable và trả về một đối tượng Subscription. Đối tượng này có một phương thức là unsubscribe, khi bạn gọi nó Observable sẽ ngừng thực thi.
Lưu ý: Nếu bạn tự tạo Observable (ví dụ như bằng cách sử dụng new Observable), bạn cần tự quản lý việc hủy thực thi.
Khi bạn subscribe, bạn nhận được một Subscription, đại diện cho việc thực thi đang diễn ra. Chỉ cần gọi unsubscribe() để hủy việc thực thi.
const subscription = observable.subscribe({
next: (value) => {
console.log(value);
},
error: (error) => {
console.log(error);
},
complete: () => {
console.log("Done");
},
});
setTimeout(() => {
subscription.unsubscribe();
}, 500);
Ở đoạn mã trên, sau 500ms, hàm unsubscribe sẽ được gọi để ngừng việc thực thi của Observable.
Observers
Observers là những đơn vị tiêu thụ dữ liệu được gửi từ Observable. Chúng được biểu diễn bằng các đối tượng chứa ba hàm callback tương ứng với mỗi loại thông báo được gửi từ Observable: next, error, complete.
Một Observer có thể có một hoặc vài callbacks từ ba callbacks kể trên (có thể là một đối tượng không có callbacks nào từ ba callbacks trên, nhưng điều này ít được sử dụng).
Ngoài cách sử dụng truyền một đối tượng Observer vào hàm subscribe để kích hoạt Observable execution, bạn cũng có thể truyền các hàm riêng lẻ. Tuy nhiên, cần lưu ý rằng bạn cần truyền các hàm theo đúng thứ tự tương ứng với next, error, complete.
Mặc dù cách này vẫn hoạt động, nhưng hiện tại không được khuyến nghị sử dụng. Bạn chỉ nên sử dụng việc truyền từng hàm riêng lẻ nếu bạn chỉ muốn xử lý thông báo “Next”.
observable.subscribe(
(x) => console.log("Observer got a next value: " + x),
(err) => console.error("Observer got an error: " + err),
() => console.log("Observer got a complete notification")
);
// Tương đương với:
const observer = {
next: (x) => console.log("Observer got a next value: " + x),
error: (err) => console.error("Observer got an error: " + err),
complete: () => console.log("Observer got a complete notification"),
};
observable.subscribe(observer);
Lưu ý: Nếu bạn không muốn xử lý error handler function, bạn có thể truyền null/undefined thay vì một hàm vào vị trí của error handler function.
Subscription trong Angular
Subscription trong Angular là một đối tượng đại diện cho một tài nguyên có thể hủy, thường được sử dụng để hủy việc thực thi của Observable trong RxJS. Điển hình trong các Subscription là method quan trọng unsubscribe
(có từ RxJS 5 trở lên), khi gọi method này, việc thực thi sẽ bị ngừng.
Ví dụ, giả sử chúng ta có một đồng hồ đếm thời gian gửi một giá trị mỗi giây, nhưng sau 5 giây chúng ta muốn dừng việc này:
const observable = interval(1000);
const subscription = observable.subscribe((x) => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
Một Subscription cơ bản chỉ chứa một hàm unsubscribe()
để giải phóng tài nguyên hoặc hủy việc thực thi Observable.
Một Subscription có thể chứa nhiều Subscription con bên trong. Khi bạn hủy một Subscription cha, tất cả các Subscription con cũng sẽ được hủy.
Trong Subscription cha, bạn có thể sử dụng method add
để thêm các Subscription con phụ thuộc vào nó:
const foo = interval(500);
const bar = interval(700);
const subscription = foo.subscribe((x) => console.log("first: " + x));
const childSub = bar.subscribe((x) => console.log("second: " + x));
subscription.add(childSub);
setTimeout(() => {
// Hủy cả `subscription` và `childSub`
subscription.unsubscribe();
}, 2000);