Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Implementing the application code

We already have a project depending on core.async, which we created in the previous section, so we'll be working off that. Let's start by adding an extra dependency on seesaw to our project.clj file:

  :dependencies [[org.clojure/clojure "1.9.0"] 
                 [org.clojure/core.async "0.4.474"] 
                 [seesaw "1.5.0"]] 

Next, we need to create a file called stock_market.clj in the src directory and add the following namespace declaration:

(ns core-async-playground.stock-market 
  (:require [clojure.core.async 
             :refer [go chan <! >! timeout go-loop map>] :as async]) 
  (:require [clojure.core.async.lab :refer [broadcast]]) 
  (:use [seesaw.core])) 

This might be a good point to restart your REPL if you haven't done so. Don't worry about any functions we haven't seen yet. We'll get a feel for them in this section.

The GUI code remains largely unchanged, so no explanation should be necessary for the following snippet:

(native!) 
 
(def main-frame (frame :title "Stock price monitor" 
                       :width 200 :height 100 
                       :on-close :exit)) 
 
(def price-label       (label "Price: -")) 
(def running-avg-label (label "Running average (5): -")) 
 
(config! main-frame :content 
         (border-panel 
          :north  price-label 
          :center running-avg-label 
          :border 5)) 
 
(defn share-price [company-code] 
  (Thread/sleep 200) 
  (rand-int 1000)) 
 
(defn avg [numbers] 
  (float (/ (reduce + numbers) 
            (count numbers)))) 
 
(defn roll-buffer [buffer val buffer-size] 
  (let [buffer (conj buffer val)] 
    (if (> (count buffer) buffer-size) 
      (pop buffer) 
      buffer)))
(defn make-sliding-buffer [buffer-size] 
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)] 
    (fn [n] 
      (swap! buffer roll-buffer n buffer-size)))) 
 
(def sliding-buffer (make-sliding-buffer 5)) 

The only difference is that we now have a sliding-buffer function that returns a window of data. This is in contrast to our original application, where the rolling-avg function was responsible for both creating the window and calculating the average. This new design is more general, as it makes this function easier to reuse. The sliding logic is the same, however.

Next, we have our main application logic by using core.async:

(defn broadcast-at-interval [msecs task & ports] 
  (go-loop [out (apply broadcast ports)] 
    (<! (timeout msecs)) 
    (>! out (task)) 
    (recur out))) 
 
(defn -main [& args] 
  (show! main-frame) 
  (let [prices-ch         (chan) 
        sliding-buffer-ch (map> sliding-buffer (chan))] 
    (broadcast-at-interval 500 #(share-price "XYZ") prices-ch sliding-buffer-ch) 
    (go-loop [] 
      (when-let [price (<! prices-ch)] 
        (text! price-label (str "Price: " price)) 
        (recur))) 
    (go-loop [] 
      (when-let [buffer (<! sliding-buffer-ch)] 
        (text! running-avg-label (str "Running average: " (avg buffer))) 
        (recur))))) 

Let's walk through the code.

The first function, broadcast-at-interval, is responsible for creating the broadcasting channel. It receives a variable number of arguments: a number of milliseconds describing the interval, the function representing the task to be executed, and a sequence of one or more output channels. These channels are used to create the broadcasting channel to which the go loop will be writing prices.

Next, we have our main function. The let block is where the interesting bits are. As we discussed in our high-level diagrams, we need two output channels: one for prices and one for the sliding window. They are both created in the following code:

... 
  (let [prices-ch         (chan) 
        sliding-buffer-ch (map> sliding-buffer (chan))] 
... 

prices-ch should be self-explanatory; however, sliding-buffer-ch is using a function we haven't encountered before: map>. This is yet another useful channel constructor in core.async. It takes two arguments: a function and a target channel. It returns a channel that applies this function to each value before writing it to the target channel. An example will help illustrate how it works:

    (def c (map> sliding-buffer (chan 10)))
    (go (doseq [n (range 10)]
          (>! c n)))
    (go (doseq [n (range 10)]
          (prn  (vec (<! c)))))
    
    ;; [0]
    ;; [0 1]
    ;; [0 1 2]
    ;; [0 1 2 3]
    ;; [0 1 2 3 4]
    ;; [1 2 3 4 5]
    ;; [2 3 4 5 6]
    ;; [3 4 5 6 7]
    ;; [4 5 6 7 8]
    ;; [5 6 7 8 9]  

That is, we write a price to the channel and get a sliding window on the other end. Finally, we create the two go blocks containing the side effects. They loop indefinitely, getting values from both channels and updating the user interface.

You can see it in action by running the program from the Terminal:

$ lein run -m core-async-playground.stock-market