Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Observer – an iterator's dual

Clojure's sequence operators, such as map, filter, reduce, and so on, support Java iterables. As the name implies, an iterable is an object that can be iterated over. At a low level, this is supported by retrieving an Iterator reference from such an object. Java's iterator interface looks like the following:

public interface Iterator<E> { 
    boolean hasNext(); 
    E next(); 
    void remove(); 
} 

When passed an object that implements this interface, Clojure's sequence operators pull data from it by using the next method, while using the hasNext method to know when to stop.

The remove method is required to remove its last element from the underlying collection. This in-place mutation is clearly unsafe in a multithreaded environment. Whenever Clojure implements this interface for the purposes of interoperability, the remove method simply throws UnsupportedOperationException.

An observable, on the other hand, has observers subscribed to it. Observers have the following interface:

public interface Observer<T> { 
    void onCompleted(); 
    void onError(Throwable e); 
    void onNext(T t); 
} 

As we can see, an observer implementing this interface will have its onNext method called with the next value available from whatever observable it's subscribed to, hence it being a push-based notification model.

This duality[1] becomes clearer if we look at both interfaces side by side:

Iterator<E> {                       Observer<T> { 
    boolean hasNext();                  void onCompleted(); 
    E next();                           void onError(Throwable e); 
    void remove();                      void onNext(T t); 
}                                       } 

Observables provide the ability to have producers push items asynchronously to consumers. A few examples will help solidify our understanding.