RxJS Filtering Operators trong RxJS
- 27-12-2023
- Toanngo92
- 0 Comments
Mục lục
filter()
trong RxJS
Operator filter()
trong RxJS giống như việc lọc các giá trị từ một stream dựa trên một điều kiện nhất định. Khi một giá trị được phát ra từ Observable, nó sẽ được đưa vào hàm predicate
. Nếu kết quả của hàm này là true
(có nghĩa là giữa), giá trị đó sẽ được truyền đi, còn nếu là false
(có nghĩa là không giữ), giá trị sẽ bị loại bỏ.
Ví dụ, khi chúng ta có một Observable phát ra các số từ 1 đến 6:
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
from([1, 2, 3, 4, 5, 6])
.pipe(
filter((x) => x % 2 === 0) // Lọc các số chẵn
)
.subscribe(console.log); // Output: 2, 4, 6
Trong đoạn mã này, filter()
được sử dụng để lọc các số chẵn từ Observable. Mỗi giá trị sẽ được truyền vào hàm predicate
, và những số chẵn (số mà khi chia cho 2 dư 0) sẽ được emit ra trong subscription. Những số không chẵn sẽ bị loại bỏ khỏi dãy số ban đầu.
first() trong RxJS
first()
trong RxJS là một operator dùng để nhận và emit giá trị đầu tiên từ một Observable, sau đó kết thúc luồng dữ liệu. Khi không có giá trị nào được phát ra từ Observable trước khi nó hoàn thành, first()
sẽ throw một EmptyError
.
Ví dụ, với một Observable phát ra các số từ 1 đến 6:
import { from } from 'rxjs';
import { first } from 'rxjs/operators';
from([1, 2, 3, 4, 5, 6])
.pipe(first())
.subscribe(
(val) => console.log(val), // Output: 1
null,
() => console.log('complete')
);
Khi sử dụng first()
mà không truyền vào điều kiện, nó sẽ emit giá trị đầu tiên của Observable và sau đó complete. Trong trường hợp Observable rỗng, như of()
mà không nhận vào giá trị nào, first()
sẽ throw một EmptyError
.
first()
cũng có thể nhận một predicate để xác định giá trị nào đáp ứng điều kiện. Nếu không có giá trị nào thỏa mãn điều kiện, nó sẽ throw một Error. Nếu muốn tránh lỗi, có thể truyền vào giá trị mặc định (defaultValue) nếu không có giá trị nào thoả mãn điều kiện.
from([1, 2, 3, 4, 5, 6])
.pipe(first((x) => x > 3))
.subscribe(
(val) => console.log(val), // Output: 4
null,
() => console.log('complete')
);
from([1, 2, 3, 4, 5, 6])
.pipe(first((x) => x > 6))
.subscribe(
null,
(err) => console.log(err), // Error: Error
null
);
from([1, 2, 3, 4, 5, 6])
.pipe(first((x) => x > 6, 'defaultValue'))
.subscribe(
(val) => console.log(val), // Output: defaultValue
null,
() => console.log('complete')
);
Nói một cách ngắn gọn, first()
trong RxJS giúp bạn lấy giá trị đầu tiên từ Observable, hoặc emit giá trị đầu tiên thỏa mãn một điều kiện, hoặc trả về một giá trị mặc định nếu không tìm thấy giá trị nào thỏa mãn điều kiện.
last()
trong RxJS
trong RxJS hoạt động đối lập hoàn toàn với
last()first()
. Nó sẽ emit giá trị cuối cùng từ Observable trước khi nó hoàn thành. Tương tự như first()
, nếu Observable kết thúc mà không có giá trị nào được phát ra, nó sẽ throw một EmptyError
.
Ví dụ, với một Observable phát ra các số từ 1 đến 6:
import { from, of } from 'rxjs';
import { last } from 'rxjs/operators';
from([1, 2, 3, 4, 5, 6])
.pipe(last())
.subscribe(
(val) => console.log(val), // Output: 6
null,
() => console.log('complete')
);
of()
.pipe(last())
.subscribe(
null,
(err) => console.log(err), // Error: EmptyError
null
);
last()
cũng có thể nhận một hàm predicate tương tự như first()
. Nếu không có giá trị nào thỏa mãn điều kiện, nó sẽ throw một Error. Nếu muốn tránh lỗi, có thể truyền vào giá trị mặc định (defaultValue) nếu không có giá trị nào thoả mãn điều kiện.
from([1, 2, 3, 4, 5, 6])
.pipe(last((x) => x > 3))
.subscribe(
(val) => console.log(val), // Output: 6
null,
() => console.log('complete')
);
from([1, 2, 3, 4, 5, 6])
.pipe(last((x) => x > 6))
.subscribe(
null,
(err) => console.log(err), // Error: Error
null
);
from([1, 2, 3, 4, 5, 6])
.pipe(last((x) => x > 6, 'defaultValue'))
.subscribe(
(val) => console.log(val), // Output: defaultValue
null,
() => console.log('complete')
);
Tóm lại, last()
trong RxJS giúp bạn lấy giá trị cuối cùng từ Observable trước khi nó hoàn thành, hoặc emit giá trị cuối cùng thoả mãn điều kiện, hoặc trả về một giá trị mặc định nếu không tìm thấy giá trị nào thoả mãn điều kiện.
find()
trong RxJS
find()
trong RxJS hoạt động tương tự như Array.prototype.find()
trong JavaScript. Nó sẽ emit giá trị đầu tiên từ Observable thoả mãn điều kiện từ predicate và sau đó hoàn thành (complete).
Ví dụ, nếu chúng ta có một Observable phát ra các số từ 1 đến 6:
import { from } from 'rxjs';
import { find } from 'rxjs/operators';
from([1, 2, 3, 4, 5, 6])
.pipe(
find((x) => x % 2 === 0) // Tìm số chẵn
)
.subscribe(
(val) => console.log(val), // Output: 2
null,
() => console.log('complete')
);
find()
yêu cầu một predicate và sẽ không emit bất kỳ Error nào nếu không tìm thấy giá trị nào thoả mãn điều kiện. Nếu không có giá trị nào thoả mãn điều kiện, nó sẽ không emit giá trị nào và chỉ gửi signal hoàn thành (complete).
Tóm lại, find()
trong RxJS giúp bạn tìm và emit giá trị đầu tiên từ Observable mà thoả mãn điều kiện từ predicate, sau đó hoàn thành Observable.
single()
trong RxJS
single()
trong RxJS hoạt động tương tự như first()
, nhưng nghiêm ngặt hơn. Nó sẽ throw Error nếu có NHIỀU HƠN 1 giá trị thoả mãn điều kiện từ predicate.
Nếu bạn có một Observable phát ra các số từ 1 đến 3:
import { from } from 'rxjs';
import { single } from 'rxjs/operators';
from([1, 2, 3])
.pipe(single((x) => x > 1))
.subscribe(
(val) => console.log(val),
(err) => console.log(err), // Error: Sequence contains more than one element
() => console.log('complete')
);
Nếu điều kiện từ predicate của single()
thoả mãn cho NHIỀU hơn 1 giá trị, nó sẽ throw Error. Trong trường hợp bạn chỉ cần một giá trị duy nhất từ Observable, single()
sẽ giúp bạn tìm giá trị đó, nhưng nếu có nhiều hơn 1 giá trị thoả điều kiện, nó sẽ báo lỗi.
Nhớ rằng single()
cần phải có một điều kiện từ predicate. Nếu không có giá trị nào thoả mãn điều kiện hoặc nếu không có điều kiện nào được cung cấp, nó sẽ emit undefined
và không throw Error.
take()
trong RxJS
take()
trong RxJS cho phép bạn lấy một số lượng cụ thể các giá trị từ một Observable trước khi nó kết thúc.
Ví dụ, nếu bạn có một Observable phát ra các số từ 1 đến 4 và bạn muốn lấy chỉ 2 số đầu tiên:
import { from } from 'rxjs';
import { take } from 'rxjs/operators';
from([1, 2, 3, 4])
.pipe(take(2))
.subscribe(
(val) => console.log(val), // output: 1, 2
null,
() => console.log('complete') // complete
);
Ở đây, take(2)
chỉ cho phép lấy 2 giá trị đầu tiên từ Observable trước khi nó kết thúc. Điều này giúp bạn kiểm soát lượng dữ liệu mà bạn muốn xử lý từ Observable mà không cần phải theo dõi khi nào nó kết thúc.
Special case: take(1)
take(1)
là một trường hợp đặc biệt trong RxJS, cho phép bạn chỉ lấy một lần duy nhất từ một Observable trước khi nó kết thúc.
Khác với first()
, take(1)
không phát ra bất kỳ lỗi nào nếu Observable tự kết thúc mà không có giá trị nào được phát ra.
Khi nào nên sử dụng take(1)
?
- Khi bạn cần báo cáo rằng người dùng đã click ở đâu khi họ truy cập trang đầu tiên.
- Khi bạn muốn snapshot dữ liệu tại một thời điểm cụ thể từ Observable.
- Trong các trường hợp sử dụng Route Guard, khi bạn cần trả về một Observable.
takeLast()
trong RxJS
takeLast()
trong RxJS giống với take()
, nhưng ngược lại. Nó lấy các giá trị cuối cùng được phát ra từ Observable, thay vì lấy các giá trị đầu tiên.
Một điều quan trọng là takeLast()
chỉ phát ra giá trị khi Observable gốc kết thúc. Nếu Observable gốc là một Observable vô tận như interval()
, takeLast()
sẽ không bao giờ phát ra bất kỳ giá trị nào.
Ví dụ:
from([1, 2, 3, 4])
.pipe(takeLast(2))
.subscribe(console.log, null, () => console.log("complete"));
// Output: 3, 4 -> complete
Ở đây, takeLast(2)
lấy 2 giá trị cuối cùng từ Observable và sau đó phát chúng khi Observable gốc kết thúc.
takeUntil()
trong RxJS
trong RxJS rất hữu ích khi bạn muốn dừng việc phát ra các giá trị từ một Observable khi một sự kiện khác xảy ra. Nó nhận vào một Observable làm “người báo hiệu” (notifier), và khi notifier phát ra một giá trị,
takeUntil()takeUntil()
sẽ dừng việc phát ra giá trị từ Observable ban đầu.
Ví dụ, nếu bạn có một chuỗi các giá trị phát ra mỗi giây từ interval(1000)
và muốn ngừng khi người dùng click chuột:
interval(1000)
.pipe(takeUntil(fromEvent(document, "click")))
.subscribe(
console.log,
null,
() => console.log("complete")
);
Ở đây, interval(1000)
phát ra giá trị mỗi giây. Nhưng khi người dùng click chuột (sự kiện “click” trên toàn bộ tài liệu), Observable sẽ dừng và xuất thông báo “complete”.
Use-case trong Angular:
Trong Angular, khi bạn cần hủy đăng ký các Observable để tránh rò rỉ bộ nhớ hoặc xử lý dữ liệu không cần thiết khi Component không còn tồn tại nữa, takeUntil()
là một lựa chọn thông dụng.
Giả sử bạn có một destroySubject: Subject<void>
trong Component đại diện cho notifier. Khi ngOnDestroy()
được gọi, bạn gửi một tín hiệu thông qua destroySubject.next()
(emit) và sử dụng takeUntil(this.destroySubject)
trong Observable.
Khi ngOnDestroy()
được kích hoạt, việc này sẽ emit một tín hiệu thông qua destroySubject
, và bởi vì bạn đã sử dụng takeUntil(this.destroySubject)
trong Observable, nó sẽ tự động hủy đăng ký Observable đó. Điều này giúp tránh rò rỉ bộ nhớ và dọn dẹp các tài nguyên không cần thiết khi Component bị hủy bỏ.
takeWhile()
trong RxJS
Trong RxJS, takeWhile()
hoạt động tương tự nhưng không giống hoàn toàn takeUntil()
. Thay vì nhận vào một notifier như takeUntil()
, takeWhile()
sử dụng một predicate.
Nhiều người sẽ sử dụng takeWhile()
và takeUntil()
khá linh hoạt và đôi khi chuyển đổi giữa chúng, nhưng chúng có những sự khác biệt quan trọng. Một số bài viết trong nhóm đã thảo luận về điều này: post 1 và post 2.
Ví dụ, nếu bạn sử dụng interval(1000).pipe(takeWhile((x) => x < 6))
, nó sẽ chỉ emit các giá trị mà thỏa mãn điều kiện x < 6
. Trong trường hợp này, output sẽ là 0, 1, 2, 3, 4, 5
rồi sau đó complete.
takeWhile()
thường được sử dụng khi bạn muốn ngừng đăng ký Observable từ chính các giá trị mà nó emit (internal). Ví dụ như trong đoạn mã trên, chúng ta kiểm tra điều kiện trực tiếp trên các giá trị mà interval
emit.
Trái lại, takeUntil()
thường được sử dụng khi bạn có một thông báo từ bên ngoài (external), một “người báo tin” để ngừng Observable.
skip()
trong RxJS
Tưởng tượng như skip()
giống như “bỏ qua” một số lượng giá trị đầu tiên mà Observable emit trước khi bắt đầu lắng nghe và emit các giá trị sau đó. Điều này tương tự như việc bạn bỏ qua một số lượng bước đầu tiên khi đi bộ hoặc làm bất kỳ điều gì.
Ví dụ, khi bạn sử dụng from([1, 2, 3, 4]).pipe(skip(1))
, nó sẽ bỏ qua phần tử đầu tiên 1
và emit các giá trị 2, 3, 4
sau đó, và sau cùng là complete.
skipUntil()
trong RxJS
Tưởng tượng như skipUntil()
là việc “bỏ qua” giá trị emit từ Observable cho đến khi một sự kiện nào đó xảy ra. Đây giống như việc bạn chờ đợi cho đến một điểm gì đó xảy ra trước khi bắt đầu quan sát hoặc xử lý các thông tin.
Ví dụ, khi bạn sử dụng interval(1000).pipe(skipUntil(fromEvent(document, "click")))
, Observable sẽ bắt đầu emit giá trị từ interval
sau khi nhấp chuột vào bất kỳ thời điểm nào sau khi chờ đợi. Điều này tạo ra kết quả là giá trị đầu tiên emit từ interval
sẽ là giá trị tại thời điểm sau khi bạn click chuột.
skipWhile()
trong RxJS
skipWhile()
giống như việc bạn đang “bỏ qua” các giá trị được emit từ Observable cho đến khi điều kiện kiểm tra không còn đúng nữa. Nó giống như việc bạn muốn bỏ qua các giá trị không thoả mãn điều kiện nào đó trước khi bắt đầu quan sát hoặc xử lý các giá trị tiếp theo.
Ví dụ, khi bạn sử dụng interval(1000).pipe(skipWhile((x) => x < 5))
, nó sẽ bắt đầu emit giá trị từ interval
sau khi giá trị đầu tiên không thỏa mãn điều kiện (trong trường hợp này, khi giá trị là 5). Sau đó, nó sẽ tiếp tục emit các giá trị tiếp theo sau giá trị 5.
distinct()
trong RxJS
distinct()
là như việc bạn muốn chỉ nhận những giá trị duy nhất mà Observable emit, nghĩa là nó sẽ chỉ cho phép các giá trị không trùng lặp đi qua. Ví dụ, nếu bạn có một danh sách các số và muốn nhận chỉ các số không lặp lại từ Observable, bạn có thể sử dụng distinct()
để lọc ra những giá trị duy nhất từ danh sách đó.
Ví dụ: from([1, 2, 3, 4, 5, 5, 4, 3, 6, 1]).pipe(distinct())
sẽ chỉ emit các giá trị không trùng lặp từ danh sách ban đầu, tức là nó sẽ chỉ lấy mỗi giá trị một lần duy nhất.
Ngoài ra, nếu bạn có một danh sách các đối tượng phức tạp và muốn so sánh dựa trên một thuộc tính cụ thể của đối tượng, bạn có thể sử dụng distinct((p) => p.name)
để chỉ lấy các đối tượng với thuộc tính name
duy nhất mà Observable emit.
distinctUntilChanged()
trong RxJS
distinctUntilChanged()
là như việc bạn muốn nhận các giá trị mới nhất và khác biệt so với giá trị trước đó mà Observable emit. Nó chỉ emit giá trị khi giá trị đó khác biệt so với giá trị ngay trước đó mà Observable đã emit.
Ví dụ: from([1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4]).pipe(distinctUntilChanged())
sẽ chỉ emit các giá trị khác biệt so với giá trị ngay trước đó, nghĩa là nó chỉ lấy các giá trị mới mà khác với giá trị ngay trước nó.
Ngoài ra, nếu bạn muốn so sánh dựa trên một thuộc tính cụ thể của đối tượng, bạn có thể sử dụng tham số keySelector
hoặc truyền vào một compare function
để xác định khi nào giá trị được coi là khác biệt. Khi compare function
không được truyền vào, distinctUntilChanged()
sẽ sử dụng ===
để so sánh giá trị. Nếu bạn muốn thay đổi cách so sánh này, bạn có thể truyền vào một compare function
mà khi trả về truthy
, distinctUntilChanged()
sẽ bỏ qua giá trị đó.
distinctUntilKeyChanged()
trong RxJS
Cái này giống như bạn muốn lấy các giá trị mới mà có sự khác biệt với giá trị trước đó trong Observable, nhưng thay vì so sánh toàn bộ giá trị, nó chỉ quan tâm đến một phần của giá trị, cụ thể là một key
trong đối tượng.
Ví dụ, khi bạn sử dụng distinctUntilKeyChanged('name')
trên một Observable chứa các đối tượng có thuộc tính ‘name’, nó sẽ chỉ lấy các đối tượng có thuộc tính ‘name’ khác nhau so với đối tượng ngay trước đó.
of(
{ age: 4, name: "Foo" },
{ age: 6, name: "Foo" },
{ age: 7, name: "Bar" },
{ age: 5, name: "Foo" }
)
.pipe(distinctUntilKeyChanged("name"))
.subscribe(console.log, null, () => console.log("complete"));
// Output: { age: 4, name: "Foo" }, { age: 7, name: "Bar" }, { age: 5, name: "Foo" } -> complete
Điều này giúp bạn lọc các đối tượng trong Observable dựa trên sự thay đổi của một thuộc tính cụ thể (‘name’ trong ví dụ này).
Note
Còn 8 operators khác cũng có chức năng tương tự nhau theo cặp như throttle/throttleTime
, debounce/debounceTime
… Trong đó, mình chỉ sẽ tập trung giải thích về *Time operators vì chúng có thêm tham số thời gian.
throttle()
nhận vào một Observable làm đối tượng điều khiển, trong khi throttleTime()
nhận vào một khoảng thời gian cụ thể tính bằng millisecond. Thông thường, trong các tác vụ hàng ngày, các operators có thêm *Time thường được sử dụng nhiều hơn so với phiên bản không có *Time. Điều này giúp kiểm soát tần suất hoạt động của Observable theo thời gian một cách linh hoạt và dễ dàng.
throttle()/throttleTime()
trong RxJS
Throttle(): Throttle nhận vào một Observable làm đầu vào để xác định thời gian chờ. Khi Observable này emit hoặc complete, thì timer sẽ được kích hoạt. Throttle sẽ bắt đầu chờ đợi và chỉ emit giá trị tiếp theo từ Observable gốc sau khi timer này hoàn thành.
ThrottleTime(): ThrottleTime(duration) đơn giản là chờ một khoảng thời gian cụ thể (milliseconds) trước khi cho phép giá trị tiếp theo từ Observable gốc được emit. Sau khi giá trị đầu tiên được emit, throttleTime sẽ đặt một khoảng thời gian chờ, trong thời gian này, mọi giá trị tiếp theo từ Observable gốc sẽ bị bỏ qua. Khi khoảng thời gian này kết thúc, throttleTime sẽ chờ và emit giá trị tiếp theo từ Observable.
fromEvent(document, "mousemove")
.pipe(throttleTime(1000))
.subscribe(console.log, null, () => console.log("complete"));
// Khi di chuyển chuột (mousemove), chỉ có giá trị đầu tiên được emit sau mỗi khoảng 1 giây
ThrottleTime có thể được tùy chỉnh bằng cách sử dụng ThrottleConfig: {leading: boolean, trailing: boolean}
. Tham số này xác định liệu throttleTime sẽ emit giá trị đầu tiên hay cuối cùng sau khi timer kết thúc. Mặc định là {leading: true, trailing: false}
.
ThrottleTime thường được sử dụng trong các trường hợp cần kiểm soát lượng event emit từ các sự kiện DOM như mousemove, giúp tránh việc emit quá nhiều event khi di chuyển chuột.
debounce()/debounceTime()
trong RxJS
Debounce(): Debounce nhận vào một Observable làm đầu vào để xác định thời gian chờ. Khi Observable này emit hoặc complete, timer sẽ được kích hoạt. Sau mỗi lần timer này kết thúc, debounce sẽ emit giá trị cuối cùng từ Observable gốc.
DebounceTime(): DebounceTime(dueTime) chờ một khoảng thời gian cụ thể (milliseconds) trước khi chấp nhận giá trị tiếp theo từ Observable gốc. Mọi giá trị trước đó sẽ bị bỏ qua. Khi một giá trị mới được emit, debounceTime sẽ thiết lập một khoảng thời gian chờ mới và chỉ khi khoảng thời gian này kết thúc, giá trị cuối cùng từ Observable gốc mới được emit.
this.filterControl.valueChanges.pipe(debounceTime(500)).subscribe(console.log);
// Mỗi lần người dùng nhập vào input, nó sẽ chờ 500ms trước khi emit giá trị cuối cùng
DebounceTime thường được sử dụng cho các trường hợp input dùng để lọc danh sách hoặc tìm kiếm, giúp tránh việc gửi quá nhiều request hoặc update khi người dùng đang nhập.
audit()/auditTime()
trong RxJS
Audit(): Hàm Audit nhận vào một Observable làm đầu vào để xác định thời gian chờ. Khi Observable này emit hoặc complete, timer sẽ được kích hoạt. Sau mỗi lần timer này kết thúc, Audit sẽ emit giá trị gần nhất từ Observable gốc.
AuditTime(): AuditTime(dueTime) chờ một khoảng thời gian cụ thể (milliseconds) trước khi chấp nhận giá trị tiếp theo từ Observable gốc. Khi một giá trị mới được emit, auditTime sẽ thiết lập một khoảng thời gian chờ mới và chỉ khi khoảng thời gian này kết thúc, giá trị gần nhất từ Observable gốc mới được emit.
fromEvent(document, "click").pipe(auditTime(1000)).subscribe(console.log);
// Sau khi click, sẽ chờ 1 giây, sau đó emit giá trị gần nhất từ click
AuditTime thường được sử dụng khi cần lấy giá trị gần nhất hoặc quan trọng nhất từ một loạt sự kiện, ví dụ như click hay keyboard input.
sample()/sampleTime()
trong RxJS
Sample(): Sample nhận vào một Observable notifier làm tín hiệu. Khi notifier này emit, sample sẽ lấy giá trị gần nhất từ Observable gốc và emit nó.
SampleTime(): SampleTime(period) chờ một khoảng thời gian cụ thể (milliseconds) trước khi lấy giá trị gần nhất từ Observable gốc và emit nó. Khoảng thời gian này được thiết lập bởi period và sau khi kết thúc, sampleTime sẽ lấy giá trị gần nhất từ Observable.
fromEvent(document, "click").pipe(sampleTime(1000)).subscribe(console.log);
// Khi click, đợi 1 giây, sau đó lấy giá trị gần nhất từ click và emit nó
Cả Sample và SampleTime thường được sử dụng khi cần lấy giá trị gần nhất từ một chuỗi sự kiện, ví dụ như lấy giá trị cuối cùng khi thực hiện click hoặc input sau một khoảng thời gian nhất định.