RxJS Transformation Operators trong RxJS
- 26-12-2023
- Toanngo92
- 0 Comments
Mục lục
Pipeable Operators trong Angular
Các Pipeable Operator trong Angular là những hàm nhận một Observable và trả về một Observable khác mà không làm thay đổi Observable ban đầu. Chúng hoạt động theo cơ chế pure: Observable đầu vào không bị ảnh hưởng.
Cấu trúc sử dụng của chúng như sau:
const returnObservable = observableInstance.pipe(operator1(), operator2());
Dù có bao nhiêu Pipeable Operator điều được thêm vào sau pipe()
, Observable ban đầu (observableInstance
) vẫn không bị thay đổi. Kết quả cuối cùng là một Observable mới, và để sử dụng nó, ta cần gán nó lại vào một biến hoặc thực hiện việc subscribe ngay sau khi sử dụng pipe.
Trước đây, trước RxJS version 5.5, người dùng thường sử dụng chuỗi method prototype để thực hiện tương tự. Tuy nhiên, từ phiên bản 5.5 trở lên, việc sử dụng pipe operators được khuyến nghị hơn.
Pipeable Operators được phân loại thành nhiều loại khác nhau, và trong phạm vi hôm nay, chúng ta sẽ tập trung tìm hiểu về Transformation Operators.
Transformation Operators
Sử dụng Array trong JavaScript, chúng ta thường lặp qua từng phần tử trong mảng và áp dụng một hàm cho mỗi phần tử, sau đó tạo một mảng mới dựa trên kết quả của việc áp dụng hàm đó. Ví dụ như này:
const users = [
{
id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
username: "tiepphan",
firstname: "tiep",
lastname: "phan",
},
{
id: "34784716-019b-4868-86cd-02287e49c2d3",
username: "nartc",
firstname: "chau",
lastname: "tran",
},
];
const usersVm = users.map((user) => {
return {
...user,
fullname: `${user.firstname} ${user.lastname}`,
};
});
Tương tự, với Observable, chúng ta cũng có thể thực hiện một quá trình tương tự như map
trên mảng. Giả sử chúng ta theo dõi người dùng đăng nhập vào hệ thống. Mỗi lần có người đăng nhập, hệ thống sẽ gửi một sự kiện cho chúng ta biết. Bằng cách sử dụng map
từ RxJS, ta có thể thực hiện việc tương tự như trên mảng Array:
import { Observable } from "rxjs";
import { map } from "rxjs/operators";
interface User {
id: string;
username: string;
firstname: string;
lastname: string;
}
const source = new Observable<User>((observer) => {
const users = [
{
id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
username: "tiepphan",
firstname: "tiep",
lastname: "phan",
},
{
id: "34784716-019b-4868-86cd-02287e49c2d3",
username: "nartc",
firstname: "chau",
lastname: "tran",
},
];
setTimeout(() => {
observer.next(users[0]);
}, 1000);
setTimeout(() => {
observer.next(users[1]);
observer.complete();
}, 3000);
});
const observer = {
next: (value) => console.log(value),
error: (err) => console.error(err),
complete: () => console.log("hoàn thành"),
};
source.subscribe(observer);
Khi chạy chương trình, sau 1 giây sẽ xuất ra người dùng đầu tiên, sau đó sau 2 giây nữa sẽ xuất ra người dùng thứ hai, kèm theo tín hiệu hoàn thành.
map trong RxJS
Cách sử dụng map
trong RxJS tương tự như khi sử dụng map
với mảng Array trong JavaScript. map
trong RxJS là một Operator, cho phép chúng ta thay đổi hoặc biến đổi dữ liệu mà Observable gửi đi mà không làm thay đổi Observable ban đầu.
Giả sử chúng ta muốn hiển thị thông tin fullname
của người dùng khi dữ liệu được gửi đi, ta có thể sử dụng map
để thực hiện điều này:
import { map } from "rxjs/operators";
source
.pipe(
map((user) => {
return {
...user,
fullname: `${user.firstname} ${user.lastname}`,
};
})
)
.subscribe(observer);
Ngoài ra, nếu yêu cầu thay đổi, chúng ta chỉ cần trả về id
của người dùng mỗi khi có dữ liệu được gửi đi:
source.pipe(map((user) => user.id)).subscribe(observer);
Cách sử dụng map
trong RxJS rất gần giống với cách sử dụng map
với mảng Array ở trên. Cả hai đều cho phép chúng ta thực hiện một thao tác biến đổi trên từng phần tử của dữ liệu ban đầu mà không thay đổi cấu trúc của dữ liệu đó.
pluck trong RxJS
pluck
trong RxJS giúp bạn lấy ra một property cụ thể từ một object trong dữ liệu mà Observable gửi đi một cách dễ dàng hơn. Ví dụ, nếu bạn chỉ muốn lấy ra id
từ mỗi object được gửi đi:
import { pluck } from "rxjs/operators";
source.pipe(pluck("id")).subscribe(observer);
Khi bạn sử dụng pluck("id")
, nó sẽ trích xuất giá trị của property id
từ mỗi object trong dữ liệu mà Observable gửi đi, và sẽ emit giá trị này tới subscriber. Điều này giúp bạn rút ngắn và tối ưu hóa mã lệnh, thay vì phải sử dụng một hàm map
dài dòng để chỉ lấy ra một property cụ thể từ mỗi object.
mapTo trong RxJS
mapTo
trong RxJS cho phép bạn luôn trả về một giá trị cố định mỗi khi stream emit một giá trị mới. Đây rất hữu ích khi bạn muốn thay đổi dữ liệu stream thành các giá trị cố định.
Ví dụ, giả sử bạn muốn tạo một chức năng để theo dõi sự kiện “mouse hover” trên một phần tử. Khi bạn di chuyển chuột vào phần tử đó, chúng ta muốn stream trả về true
, và khi rời chuột ra khỏi phần tử đó, stream trả về false
. Bạn có thể sử dụng mapTo
để làm điều này:
import { fromEvent, merge } from "rxjs";
import { mapTo } from "rxjs/operators";
const element = document.querySelector("#hover");
const mouseover$ = fromEvent(element, "mouseover");
const mouseleave$ = fromEvent(element, "mouseleave");
const hover$ = merge(
mouseover$.pipe(mapTo(true)),
mouseleave$.pipe(mapTo(false))
);
hover$.subscribe(observer);
Với mapTo
, chúng ta có thể dễ dàng thay đổi giá trị emit từ mouseover$
và mouseleave$
thành true
hoặc false
tương ứng, tạo ra một stream mới để theo dõi sự kiện “mouse hover” trên phần tử đó.
scan trong RxJS
scan
trong RxJS tương tự như việc sử dụng reduce
trong JavaScript khi làm việc với mảng. Nó cho phép bạn áp dụng một hàm lên mỗi giá trị emit từ stream, đồng thời lưu trữ kết quả tích lũy từ các giá trị trước đó.
Ví dụ, để đếm số lần click vào một nút như trước, bạn có thể sử dụng scan
như sau:
import { fromEvent } from "rxjs";
import { scan } from "rxjs/operators";
const button = document.querySelector("#add");
const click$ = fromEvent(button, "click");
click$.pipe(scan((acc) => acc + 1, 0)).subscribe(observer);
Ở đây, scan
được sử dụng để tính số lần click vào nút và lưu trữ kết quả tích lũy (accumulator) trong biến acc
.
Tiếp theo, nếu bạn muốn đếm tổng số bài đăng của những người dùng đăng nhập theo thời gian, bạn có thể thực hiện như sau:
import { Observable } from "rxjs";
import { scan } from "rxjs/operators";
const users$ = new Observable<User>((observer) => {
// ... (đoạn code khởi tạo users)
users.forEach((user) => observer.next(user));
observer.complete();
});
users$
.pipe(scan((acc, curr) => acc + curr.postCount, 0))
.subscribe(observer);
Ở đây, scan
được dùng để tính tổng số bài đăng của từng người dùng và tích lũy kết quả vào biến acc
. Kết quả tích lũy này sẽ được cập nhật mỗi khi một người dùng mới được emit từ stream users$
.
reduce trong RxJS
reduce
trong RxJS hoạt động tương tự như scan
nhưng có một điểm khác biệt quan trọng. Nó tương tự như việc sử dụng Array.reduce
trong JavaScript khi làm việc với mảng.
Operator này cũng thực hiện việc tích lũy giá trị theo thời gian, nhưng khác với scan
, reduce
sẽ đợi cho đến khi source hoàn thành (complete) trước khi nó emit ra một giá trị cuối cùng và gửi tín hiệu complete.
Ví dụ, nếu chúng ta muốn tính tổng số bài đăng của các người dùng và chỉ muốn có kết quả duy nhất khi source hoàn thành:
import { Observable } from "rxjs";
import { reduce } from "rxjs/operators";
const users$ = new Observable<User>((observer) => {
// ... (đoạn code khởi tạo users)
users.forEach((user) => observer.next(user));
observer.complete();
});
users$
.pipe(reduce((acc, curr) => acc + curr.postCount, 0))
.subscribe(observer);
Ở đây, reduce
sẽ tích lũy và tính tổng số bài đăng của từng người dùng, nhưng nó sẽ chỉ emit ra kết quả cuối cùng khi source stream (users$
) hoàn thành. Điều này đảm bảo rằng chúng ta chỉ nhận được một kết quả duy nhất khi toàn bộ dữ liệu đã được xử lý xong.
toArray trong RxJS
Operator toArray
trong RxJS rất hữu ích khi bạn muốn thu thập toàn bộ các giá trị được phát ra bởi một stream và lưu chúng thành một mảng. Khi stream hoàn thành, nó sẽ emit một mảng chứa tất cả các giá trị đã được thu thập và sau đó gửi tín hiệu hoàn thành.
Nếu bạn đã quen thuộc với reduce
, bạn có thể nhận thấy rằng toArray
thực tế là một cách ngắn gọn hơn để thực hiện công việc tương tự.
Chẳng hạn, nếu bạn muốn thu thập tất cả các giá trị postCount
của người dùng thành một mảng khi stream hoàn thành:
import { Observable } from "rxjs";
import { toArray } from "rxjs/operators";
const users$ = new Observable<User>((observer) => {
// ... (khởi tạo dữ liệu người dùng)
users.forEach((user) => observer.next(user));
observer.complete();
});
users$.pipe(pluck("postCount"), toArray()).subscribe(observer);
Ở đây, toArray
sẽ thu thập tất cả các giá trị postCount
thành một mảng khi stream users$
hoàn thành. Cách này giúp mã của bạn trở nên ngắn gọn và dễ đọc hơn so với việc sử dụng reduce
để thực hiện công việc tương tự.
buffer trong RxJS
Chức năng của operator buffer
trong RxJS là thu thập các giá trị được phát ra từ một stream và chờ đợi cho đến khi một closingNotifier
(thông báo kết thúc) phát ra tín hiệu, sau đó nó sẽ tổ chức các giá trị đó thành một mảng và emit ra kết quả.
Ví dụ, nếu chúng ta muốn lưu trữ các giá trị emit ra từ một stream interval và chờ đợi cho đến khi sự kiện click xảy ra để tổ chức các giá trị đó thành một mảng:
import { interval, fromEvent } from "rxjs";
import { buffer } from "rxjs/operators";
const interval$ = interval(1000); // Tạo một stream phát ra giá trị sau mỗi giây
const click$ = fromEvent(document, "click"); // Tạo một stream cho sự kiện click trên document
const buffer$ = interval$.pipe(buffer(click$)); // Sử dụng buffer để tổ chức giá trị từ interval$ khi click$ emit
const subscribe = buffer$.subscribe((val) =>
console.log("Buffered Values: ", val)
);
Khi bạn chạy đoạn mã này và click vào trang, nó sẽ thu thập các giá trị được phát ra từ stream interval$
và sau mỗi lần nhấp chuột, nó sẽ tổ chức các giá trị đó thành một mảng và emit ra dưới dạng “Buffered Values”. Điều này giúp tổ chức và xử lý các giá trị dễ dàng dựa trên sự kiện hoặc điều kiện bạn xác định.
bufferTime trong RxJS
Operator bufferTime
trong RxJS là một cách để lưu trữ các giá trị được phát ra từ một stream trong một khoảng thời gian nhất định. Nó tổ chức các giá trị này thành các mảng và emit chúng sau mỗi khoảng thời gian bufferTimeSpan
(được đo bằng miligiây).
Ví dụ, nếu chúng ta có một stream phát ra giá trị sau mỗi 500ms và chúng ta sử dụng bufferTime(2000)
:
import { interval } from "rxjs";
import { bufferTime } from "rxjs/operators";
const source = interval(500); // Stream phát ra giá trị sau mỗi 500ms
const bufferTimeOperator = source.pipe(bufferTime(2000)); // Sử dụng bufferTime với khoảng thời gian 2000ms
const bufferTimeSub = bufferTimeOperator.subscribe(
(val) => console.log('Buffered with Time:', val)
);
Khi chúng ta chạy đoạn mã này, nó sẽ thu thập các giá trị phát ra từ stream source
sau mỗi khoảng thời gian 2 giây (tức là trong mỗi khung thời gian 2 giây), tổ chức chúng thành các mảng và emit ra dưới dạng “Buffered with Time”. Điều này giúp tổ chức dữ liệu theo khoảng thời gian và xử lý chúng một cách linh hoạt hơn.