30 días con RxJS

Jorge Cano, inspirado por la iniciativa de Web Bos de 30 días con JavaScript, creó 30 días con RxJS. Y aquí están mis notas, que he ido tomando a lo largo de los días, siguiendo el camino de Jorge, un día cada vez.

Bueno, Jorge no llegó a producir los 30 días. Veremos hasta qué día llego yo consumiéndolo.

Día 1

Como suponía, cosas básicas: qué es un Observable, un Observer, un Subject,… Cómo suscribirse…

Nada realmente interesante

Día 2

Operadores:

Día 3

Cuadrante sobre valores únicos o múltiples, síncronos o asíncronos.

Los observables se encuadran en valores múltiples asíncronos.

En lugar de síncronos/asíncronos, Jorge habla de sistemas productor/consumidor pull/push. Pull sería cuando es el consumidor el que decide cuando recibir los datos. Por ejemplo, una función que es llamada. El llamante decide cuándo va a recibir el valor producido/devuleto por la función.

Los observables serían sistemas push, es el productor quien decide cuándo se producen los valores. Estos valores son entregados a los consumidores sin que éstos sepan el momento, son entregados asíncronamente.

Al menos esta relación síncrono-pull/asíncrono-push es la que he hecho yo en mi cabeza.

Día 4: observables como generalizaciones de funciones

En otras ocasiones hemos visto los Observables como:

Hoy hace un paralelismo entre Observables y funciones:

Los Observables son asíncronos

Bueno, eso no tiene por qué ser verdad del todo. Cuando un Observer se suscribe puede recibir los valores de forma síncrona:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200); // "return" yet another
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

/* result:
before
Hello
42
100
200
after
*/

Y también asíncrona, si a la hora de crear el Observable así lo queremos:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  setTimeout(() => observer.next(200), 1000);
});

func.call() significa dame un valor sincrónicamente

observable.subscribe() significa dame cualquier cantidad de valores, de forma síncrona o asíncrona

Día 5

Los observables se crean con Observable.create o con operadores como of, interval, from,…

Nos cuenta qué papel juegan los métodos next, error y complete de los Observers.

El método Observable.subscribe devuelve una suscripción, la cual tiene un método llamado unsubscribe que sirve para cancelar una suscripción. Este método para cancelar puede ser personalizado a la hora de crear una suscripción. El método unsubscribe es la función que nosotros retornemos al crear el Observable con Observable.create.

Día 6

Un Observer es un objeto con tres métodos, uno por cada tipo de notificación que puede entregar el Observable: next, error y complete

Observable.observe también admite funciones como parámetros. Internamente construye un objecto Observer asignándole esas funciones a next,…

Día 7

Hoy va de suscripciones.

Una suscripción tiene básicamente un método unsubscribe para cancelar una suscripción y así, nuestro Observer, dejará de recibir datos.

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();

Una suscripción también tiene un método add, para agrupar varias suscripciones y poder cancelarlas todas de una vez.

Día 8: Subjects

Mientras que un Observable envía valores a un solo Observer y para cada Observer que se suscribe tiene una ejecución diferente, un Subject puede enviar values a varias suscripciones (hace multicast). Los Subjects son parecidos a emisores de eventos, mantienen una lista de listeners/suscriptores.

Todo Subject es un Observable. Los Observers pueden suscribirse a él.

Todo Subject es un Observer. Tiene métodos next, error y complete, que permite controlar el flujo de datos.

Los Subjects suelen ser utilizados para enviar a múltiples Observers los valores de un Observable, es decir, para convertir un Observable unicast en uno multicast. Es una manera de que una ejecución Observable sea compartida por varios Observers.

Día 9: Observables multicast

No es necesario que hagamos todo el trabajo nosotros para convertir un Observable unicast a multicast con un Subject. Los Observables tienen un método multicast que nos ahorra mucho trabajo, usando subscribe al Subject y al Observable de la forma correcta:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({...});
multicasted.subscribe({...});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

Día 10: contando referencias

Es muy engorroso estar esperando a que haya Observers suscritos para llamar a connect del Observable multicast.

Hay una manera de hacer que el Observable multicast comience a enviar datos cuando el primer Observer se suscriba y deje de hacerlo cuando el último Observer cancele la suscripción, ConnectableObservable.refCount().

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2;

// observer #1 subscribed, multicasted observable starts
setTimeout(() => subscription1 = refCounted.subscribe({...}), 500);
// observer #2 subscribed
setTimeout(() => subscription2 = refCounted.subscribe({...}), 600);

// observer #1 unsubscribes
setTimeout(() => subscription1.unsubscribe(), 1200);
// observer #2 unsubscribes, multicasted observable stops
setTimeout(() => subscription2.unsubscribe(), 2000);

Día 11

Hoy estudio una variante de Subject, los BehaviourSubject, el cual tiene una noción de valor actual, de valor vivo actualmente en el flujo.

Un BehaviourSubject almacena el último valor emitido a sus consumidores de forma que cada vez que se suscribe uno nuevo, éste último valor es el primer valor que recibe.

Jorge hace un símil: un flujo de eventos de tu cumpleaños es un Subject (son eventos que ocurren a lo largo del tiempo). El flujo de la edad de una persona es un BehaviourSubject (cuando conoce a una nueva persona le da el valor de su edad, su edad es el valor actual del flujo durante un año)

Día 12

Aquí otra variante de Subject, ReplaySubject, bastante más interesante que BehaviourSubject. ReplaySubject puede enviar valores antiguos a los nuevos suscriptores.

Puede almacenar n valores:

var subject = new Rx.ReplaySubject(3);

También se puede especificar un tiempo en milisegundos. En este ejemplo guardará 100 valores antiguos o los ocurridos en los últimos 500 ms.

var subject = new Rx.ReplaySubject(100, 500);

Día 13

Otra variante más de Subject, AsyncSubject. Esta variante espera a que el flujo termine (se llame al método complete). Entonces, y solo entonces, AsyncSubject envía el último valor a sus suscriptores. Sólo entrega un valor.

Día 14

Por fin termina con los Subjects. Hoy habla de operadores:

Los operadores son las piezas esenciales que permiten que el código asincrónico complejo se componga fácilmente de forma declarativa

Los operadores son básicamente funciones que toman un Observable como parámetro y devuelve otro Observable sin modificar la entrada.

function multiplyByTen(input) {
  return Rx.Observable.create(observer => {
    input.subscribe({
      next: v => observer.next(10 * v),
      error: err => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

Cuando nos suscribimos a output, se ejecuta el Observable creado en multiplyByTen, el cual se suscribe a input pasado como parámetro. Esta suscripción en cadena se llama: cadena de suscripción de operador.

Día 15

Según Jorge, cuando se habla de operadores, generalmente, nos estamos refiriendo a operadores de instancia. RxJS cambió bastante su API en la versión 6, y no sé si esto sigue siendo cierto.

Los operadores de instancia son métodos en instancias observables, utilizan this como entrada observable (la variable input del día anterior).

Se usarían así:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {...}
const observable = Rx.Observable.from([...]);
observable
  .multiplyByTen()
  .subscribe(...);

En la nueva versión de RxJS, deberíamos usar el método pipe, y multiplyByTen creo que ya no es nunca un método del prototipo de Rx.Observable. Creo que en este caso, este curso se ha quedado atrás.

Otro tipo de operadores son los operadores estáticos, que son funciones asociadas directamente a la clase Rx.Observable. Normalmente éstos son los operadores de creación. Su entrada no suele ser un observable, si no otro tipo de datos, y con ellos crean un observable.

Ejemplos de estos operadores serían interval y from. De nuevo, a partir de la versión 6 de RxJS, no estoy seguro de que esto sea cierto, ya que estos operadores se pueden importar independientemente de la clase Observable.

Día 16

Hoy aprendemos a interpretar los diagramas de canicas (marble diagrams). Y qué mejor que un diagrama explicado para ello:

Marble diagram

Nota: imagen obtenida del artículo de Jorge Cano

Día 17

Hmmm, no muy útil, simplemente cómo navegar por la documentación de ReactiveX

Día 18

Este día utiliza bastantes operadores de RxJS. En principio, ninguno de ellos es raro, todos son entendibles por separado. Pero el ejemplo final es algo más complejo de entender en su conjunto.

En ejemplo final consta de:

En realidad, no parece que use cosas muy complejas de RxJS. En el flujo de activar/desactivar modifica un flag. De echo, la modificación del flag es un poco liosa. En el flujo de seguir al ratón, comprueba el flag. Si el flag está desactivado, no mueve el texto. No es muy reactivo ese if por ahí metido.

El código del ejemplo se puede ver aquí.

Día 19

La actualización a la versión 6 le ha pillado a medias de la serie. Y creo que es lo que le ha matado, porque la serie se queda en el día 20, mañana lo veremos y lo acabaremos.

Día 20

Hoy es el día en que se da cuenta de cuánto ha cambiado RxJS. Habla del operador pipe, que es la nueva forma de encadenar operadores.