Rxjs và Reactive programming – chi tiết về ý nghĩa và cách hoạt động

0
45
Rate this post
Video rxjs la gi

Chào các bạn. Nếu bạn đã từng nghiên cứu hoặc tham gia vào một dự án liên quan đến Angular, khả năng cao bạn đã từng làm việc hoặc ít nhất đã nghe đến thư viện RxJS. Chắc chắn khi chỉ nghe tên thư viện này, nhiều người đã thấy khá mệt mỏi vì tính phức tạp của nó. Hiện tại, tôi đã có một thời gian làm việc với RxJS, vì vậy hôm nay tôi sẽ chia sẻ với mọi người một số kiến thức và kinh nghiệm của mình về các khái niệm trong thư viện này.

1. Lập trình Reactive

Điều quan trọng đầu tiên trước khi bạn sử dụng RxJS một cách hiệu quả là hiểu rõ về khái niệm lập trình Reactive. Lập trình Reactive là một thuật ngữ chỉ một phương pháp phân tích logic mới. Hãy xem hình ảnh dưới đây để hiểu rõ hơn.

Hình ảnh

Trong lập trình Reactive, dữ liệu sẽ được chứa trong một luồng (stream). Bạn có thể tưởng tượng một luồng giống như một băng truyền như trong hình ảnh. Khi luồng nhận vào một gói dữ liệu mới, gói dữ liệu đó sẽ được luồng chuyển đến các bộ sửa đổi (modifier). Các bộ sửa đổi này là các hàm, nhiệm vụ của chúng là phản ứng (react) với các gói dữ liệu được đưa vào luồng, thay đổi các gói dữ liệu đó và trả lại cho luồng các gói dữ liệu vừa được thay đổi.

Như bạn có thể thấy, các bộ sửa đổi không chạy theo cách bị động, mà chúng sẽ tự động chạy, phản ứng với mỗi gói dữ liệu được luồng truyền vào. Điều này chính là lý do vì sao mô hình này được gọi là lập trình Reactive.

Nhờ vào việc các bộ sửa đổi tự động phản ứng với các gói dữ liệu được luồng truyền vào, chúng ta không cần phải chạy các bộ sửa đổi bằng tay. Do đó, luồng hoạt động của mô hình này rất dễ hiểu và theo dõi. Một đặc điểm cực kỳ hữu ích của mô hình này là chúng ta có thể xác định thứ tự của các bộ sửa đổi trước khi một luồng chạy, tùy thuộc vào thứ tự của các bộ sửa đổi mà các gói dữ liệu trả về sẽ khác nhau. Điều này giúp cho luồng hoạt động của luồng trở nên rõ ràng, có cấu trúc và dễ debug.

Khi áp dụng vào thực tế, lập trình Reactive có thể được sử dụng trong rất nhiều trường hợp, ví dụ như method setInterval. Chúng ta có thể coi rằng mỗi khi setInterval được chạy, nó sẽ tạo ra một luồng (stream). Như ví dụ dưới đây, luồng mà method setInterval tạo ra sẽ nhận vào một gói dữ liệu có giá trị là undefined sau mỗi 1 giây và callback function mà ta truyền vào trong setInterval sẽ có vai trò như là một bộ sửa đổi.

setInterval(data => {
  console.log('Gói dữ liệu được nhận là: ', data);
}, 1000);

Tương tự như vậy, chúng ta cũng có thể coi như rằng method setTimeout sẽ tạo ra một luồng, tuy nhiên luồng này sẽ dừng lại ngay sau khi callback function (bộ sửa đổi) ta truyền vào trong setTimeout hoàn thành.

setTimeout(data => {
  console.log('Gói dữ liệu được nhận là: ', data);
}, 1000);

Sự kiện cũng có thể được coi là các luồng. Ví dụ như sự kiện click, mỗi khi chúng ta bấm click, một gói dữ liệu chứa thông tin về lần click đó sẽ được truyền vào luồng, và callback của sự kiện click sẽ là các bộ sửa đổi xử lý các gói dữ liệu đó.

document.onclick = function(evt) {
  console.log('Gói dữ liệu được nhận là: ', evt);
}

2. RxJS

Vậy là mình đã giới thiệu xong với các bạn về lập trình Reactive. Phần lớn các bài viết trên mạng về mô hình này thường giới thiệu rất chung chung và khó hiểu. Vì vậy, mong rằng phần giới thiệu ở trên có thể mang đến cho các bạn một cái nhìn mới, dễ hiểu hơn về lập trình Reactive. Bây giờ chúng ta hãy cùng xem cách RxJS hoạt động.

2.1 RxJS là gì

RxJS chỉ là một thư viện giúp chúng ta mô hình hoá lập trình Reactive để có thể áp dụng nó vào thực tế một cách dễ dàng.

2.2 Các thuật ngữ trong RxJS

Trước khi chúng ta đi sâu vào RxJS, hãy đọc qua một số thuật ngữ chính trong thư viện này.

  • Observable: là một đối tượng được dùng để tạo các đối tượng tạm thời (observable instances). Tạm coi Observable như một lớp, và các đối tượng của lớp này chính là các luồng (stream).

  • Executor: nếu Observable là một lớp thì executor chính là phần logic hoặc phương thức khởi tạo của lớp đó. Executor giúp ta xác định cách mà một luồng sẽ hoạt động (chi tiết được giải thích ở dưới).

  • Observer: là một đối tượng có 3 phương thức, mỗi phương thức sẽ chạy dựa trên trường hợp khác nhau:

    • next: phương thức này sẽ chạy (phản ứng) mỗi khi nó nhận được một gói dữ liệu.
    • error: phương thức này sẽ chạy (phản ứng) khi luồng theo nó gặp lỗi, phương thức này nhận vào một tham số là lỗi mà luồng gặp phải.
    • complete: phương thức này sẽ chạy (phản ứng) khi logic của luồng theo nó kết thúc (chi tiết sẽ được giải thích sau).
  • Subscribe: Nếu Observable là một lớp thì subscribe chính là từ khoá “new”. Khi được gọi, phương thức này sẽ tạo ra một luồng dựa trên Observable mà nó được gọi và chạy luồng đó.

  • Subscription: là một đối tượng trả về từ phương thức subscribe, đối tượng này có một số phương thức được sử dụng để điều khiển quá trình hoạt động của executor.

  • Operator: chính là các bộ sửa đổi, nhưng chúng cũng đóng vai trò là người vận chuyển các gói dữ liệu đến luồng.

2.3 Cách Observable hoạt động

Ok, đến đây, có thể bạn đang thắc mắc tại sao mình lại giải thích cách thức hoạt động của Observable mà không phải cách RxJS hoạt động. Lý do là RxJS là một hệ sinh thái bao gồm một số công cụ chính giúp chúng ta mô phỏng lại lập trình Reactive, và một trong số chúng là Observable.

Để giúp mọi người hiểu cách Observable hoạt động, mình đã tạo một Observable mô phỏng lại phương thức setInterval với thời gian chạy cố định là 1 giây.

import { Observable } from "rxjs";

const interval$ = new Observable(observer => {
  let intervalCounter = 0;
  const intervalInstance = setInterval(() => {
    intervalCounter++;
    observer.next(intervalCounter);
    if(intervalCounter === 3) {
      clearInterval(intervalInstance);
      observer.error('error');
      observer.complete();
    }
  }, 1000);
});

const observer = {
  next: intervalCounter => {
    console.log(intervalCounter);
  },
  error: error => {
    console.error(error);
  },
  complete: () => {
    console.log('complete');
  }
}

interval$.subscribe(observer);
  • Đầu tiên, dấu $ ở cuối biến interval$ là một quy ước khi đặt tên biến chứa một Observable.

  • Constructor Observable được dùng để tạo ra các đối tượng Observable. Như mình đã đề cập ở trên, các đối tượng Observable có thể coi như các lớp (class) của các luồng (stream), và mỗi khi phương thức subscribe của một Observable được gọi, một luồng sẽ được tạo ra và chạy ngay lập tức cho đến khi luồng này hoàn thành quá trình chạy.

  • Constructor này nhận vào một hàm (executor) và truyền vào trong hàm đó một observer, 3 phương thức của object observer này là next, complete, error. Và chúng chính là 3 phương thức next, complete, error ta định nghĩa trong object observer và truyền vào trong phương thức subscribe.

  • Đi sâu hơn về executor, một executor chính là cách mà một luồng hoạt động. Hơi khó hiểu phải không? Tuy nhiên, nếu bạn coi cách hoạt động của một phương thức setInterval là một luồng như ví dụ ở phần giới thiệu về lập trình Reactive, bạn có thể thấy executor chỉ là một hàm được dùng để chạy phương thức setInterval, điều duy nhất ngoài việc gọi phương thức setInterval mà executor làm là chạy các phương thức next, complete, error của object observer mà nó nhận được (đây chính là lúc các operator này đóng vai trò là những người vận chuyển dữ liệu). Mỗi khi phương thức subscribe của một Observable được chạy, executor của Observable đó sẽ được chạy và mỗi khi một executor được chạy, một luồng sẽ được tạo ra. Okay, bây giờ bạn có thể từ bỏ việc coi Observable như là một lớp của các luồng rồi đấy.

  • Trái ngược với luồng hay Observable, Observer khá dễ hiểu. Nếu bạn đọc kỹ đoạn mã ở trên và thử nghiệm, object observer được truyền vào trong executor chính là object observer mà bạn truyền vào trong phương thức subscribe. Thực chất, mình đang gọi phương thức next của object observer mà mình truyền vào trong subscribe khi viết: observer.next(intervalCounter);. Chính là cách các operator được chạy, và khi chúng được chạy, chúng sẽ đóng vai trò là các operator. Bạn có thể thấy mình cố tình gọi phương thức error: observer.error('error'); khi setInterval chạy đến lần thứ 3.

2.3.1 Dừng chạy và hoàn thành một Observable

Như mình đã đề cập ở mục 2.2, Subscription có một số phương thức được sử dụng để điều khiển quá trình chạy của một luồng, và phương thức quan trọng nhất và được sử dụng phổ biến nhất là unsubscribe.

const subscription = interval$.subscribe(observer);

setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

Khi bạn thay đoạn subscribe ví dụ ở phần 2.3 bằng ví dụ trên, bạn sẽ thấy executor của chúng ta chỉ in log ra inspector đúng một lần. Đó là vì chúng ta đã dừng executor.

Để bắt sự kiện khi một executor dừng chạy, ta có thể trả về một hàm ngay trong executor, hàm này sẽ được class Observable gọi khi phương thức unsubscribe của một Observable được gọi. Việc bắt sự kiện này có lẽ không phổ biến lắm, nhưng trong một số trường hợp cụ thể như ví dụ dưới đây, nó rất quan trọng vì nó sẽ giúp chúng ta dừng hàm setInterval khi ta không cần dùng Observable interval$ nữa.

import { Observable } from "rxjs";

const interval$ = Observable.create(observer => {
  let intervalCounter = 0;
  const intervalInstance = setInterval(() => {
    intervalCounter++;
    observer.next(intervalCounter);
  }, 1000);

  // function này sẽ được chạy khi Observable `interval$` bị unsubscribe
  return () => {
    console.log('complete');
    clearInterval(intervalInstance);
  }
});

const subscription = interval$.subscribe(
  // Class `Observable` sẽ tự động map lần lượt các function dưới đây vào các phương thức `next`, `error` và `complete`
  // của object observer mà nó chuyền vào trong executor
  intervalCounter => console.log(intervalCounter),
  error => console.log(error),
);

setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

2.3.2 Observable contract

Observable contract là một quy định trong RxJS:

  • Một luồng sẽ không thể nhận các gói dữ liệu sau khi observer.complete được gọi hoặc sau khi observer.error được gọi.
  • Khi observer.error đã được gọi, luồng sẽ không thể nhận được bất kỳ gói dữ liệu nào từ hoặc kích hoạt observer.complete nữa.
  • Khi observer.complete đã được gọi, luồng sẽ không thể nhận được bất kỳ gói dữ liệu nào từ hoặc kích hoạt observer.error nữa.

2.3.3 Observable có thể được sử dụng để xử lý bất đồng bộ?

Mình đã thấy rất nhiều người cho rằng RxJS hay Observable được sử dụng để xử lý bất đồng bộ. Nhưng thực tế thì cả RxJS và Observable không liên quan gì đến việc xử lý bất đồng bộ, tất cả những gì RxJS làm chỉ là giúp chúng ta mô phỏng lại lập trình Reactive. Việc một Observable được dùng để xử lý bất đồng bộ hay không hoàn toàn phụ thuộc vào cách chúng ta xử lý executor. Như ví dụ dưới đây, mình đã tạo một Observable xử lý đồng bộ.

import { Observable, noop } from "rxjs";

const syncObservable$ = new Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

syncObservable$.subscribe(
  intervalCounter => console.log(intervalCounter),
  noop,
  () => console.log('complete')
);

console.log(4);

Như bạn có thể thấy, mình không gọi phương thức error trong executor của Observable syncObservable$, vì vậy việc định nghĩa một hàm xử lý lỗi cho Observable syncObservable$ là hoàn toàn không cần thiết. Đối với những trường hợp như này hoặc nếu bạn không quan tâm đến việc xử lý một trong 3 phương thức của object observer, bạn có thể truyền noop vào vị trí tương ứng.

Kết luận

Vậy là mình đã giải thích xong với các bạn về lập trình Reactive và các khái niệm chính trong thư viện RxJS. Mục đích của bài viết này không đi sâu vào thư viện RxJS, đặc biệt là về operator, một trong những điểm hấp dẫn của thư viện này và Subject (một dạng Observable đặc biệt). Mình chỉ muốn giới thiệu một cái nhìn tổng quan, dễ hiểu về RxJS để giúp các bạn mới bắt đầu học RxJS của mình trở nên dễ dàng hơn. Thực tế, bạn chỉ cần nắm vững những khái niệm ở trên thì bạn sẽ gần như đã nắm vững thư viện này, những thứ như Subject hay operator… sẽ trở nên cực kỳ dễ hiểu và dễ áp dụng. Chúc các bạn một ngày vui vẻ. Happy coding!

Được chỉnh sửa bởi: Dnulib
Xem thêm thông tin tại Dnulib