aleph.examples

0.4.6


dependencies

aleph
0.4.6
gloss
0.2.6
compojure
1.6.1
org.clojure/clojure
1.9.0
org.clojure/core.async
0.4.474



(this space intentionally left almost blank)
 
(ns aleph.examples.tcp
  (:require
    [manifold.deferred :as d]
    [manifold.stream :as s]
    [clojure.edn :as edn]
    [aleph.tcp :as tcp]
    [gloss.core :as gloss]
    [gloss.io :as io]))

Complete documentation for the aleph.tcp namespace can be found here.

the basics

This uses Gloss, which is a library for defining byte formats, which are automatically compiled into encoder and streaming decoders.

Here, we define a simple protocol where each frame starts with a 32-bit integer describing the length of the string which follows. We assume the string is EDN-encoded, and so we define a pre-encoder of pr-str, which will turn our arbitrary value into a string, and a post-decoder of clojure.edn/read-string, which will transform our string into a data structure.

(def protocol
  (gloss/compile-frame
    (gloss/finite-frame :uint32
      (gloss/string :utf-8))
    pr-str
    edn/read-string))

This function takes a raw TCP duplex stream which represents bidirectional communication via a single stream. Messages from the remote endpoint can be consumed via take!, and messages can be sent to the remote endpoint via put!. It returns a duplex stream which will take and emit arbitrary Clojure data, via the protocol we've just defined.

First, we define a connection between out and the raw stream, which will take all the messages from out and encode them before passing them onto the raw stream.

Then, we splice together a separate sink and source, so that they can be presented as a single duplex stream. We've already defined our sink, which will encode all outgoing messages. We must combine that with a decoded view of the incoming stream, which is accomplished via gloss.io/decode-stream.

(defn wrap-duplex-stream
  [protocol s]
  (let [out (s/stream)]
    (s/connect
      (s/map #(io/encode protocol %) out)
      s)
    (s/splice
      out
      (io/decode-stream s protocol))))

The call to aleph.tcp/client returns a deferred, which will yield a duplex stream that can be used to both send and receive bytes. We asynchronously compose over this value using manifold.deferred/chain, which will wait for the client to be realized, and then pass the client into wrap-duplex-stream. The call to chain will return immediately with a deferred value representing the eventual wrapped stream.

(defn client
  [host port]
  (d/chain (tcp/client {:host host, :port port})
    #(wrap-duplex-stream protocol %)))

Takes a two-argument handler function, which takes a stream and information about the connection, and sets up message handling for the stream. The raw stream is wrapped in the Gloss protocol before being passed into handler.

(defn start-server
  [handler port]
  (tcp/start-server
    (fn [s info]
      (handler (wrap-duplex-stream protocol s) info))
    {:port port}))

echo servers

This creates a handler which will apply f to any incoming message, and immediately send back the result. Notice that we are connecting s to itself, but since it is a duplex stream this is simply an easy way to create an echo server.

(defn fast-echo-handler
  [f]
  (fn [s info]
    (s/connect
      (s/map f s)
      s)))

demonstration

We start a server s which will return incremented numbers.

(def s
  (start-server
    (fast-echo-handler inc)
    10000))

We connect a client to the server, dereferencing the deferred value returned such that c is simply a duplex stream that takes and emits Clojure values.

(def c @(client "localhost" 10000))

We put! a value into the stream, which is encoded to bytes and sent as a TCP packet. Since TCP is a streaming protocol, it is not guaranteed to arrive as a single packet, so the server must be robust to split messages. Since both client and server are using Gloss codecs, this is automatic.

@(s/put! c 1)  ; => true

The message is parsed by the server, and the response is sent, which again may be split while in transit between the server and client. The bytes are consumed and parsed by wrap-duplex-stream, and the decoded message can be received via take!.

@(s/take! c)   ; => 2

The server implements java.io.Closeable, and can be stopped by calling close().

(.close s)

end demonstration

While we can do trivial computation on the same thread we receive messages, longer computation or blocking operations should be done elsewhere. To accomplish this, we need something a little more complicated than connecting a stream to itself.

Here, we define an asynchronous loop via manifold.deferred/loop. In this loop, we take a message from the stream, transform it on another thread with manifold.deferred/future, send it back, and then repeat.

(defn slow-echo-handler
  [f]
  (fn [s info]
    (d/loop []
      ;; take a message, and define a default value that tells us if the connection is closed
      (-> (s/take! s ::none)
        (d/chain
          ;; first, check if there even was a message, and then transform it on another thread
          (fn [msg]
            (if (= ::none msg)
              ::none
              (d/future (f msg))))
          ;; once the transformation is complete, write it back to the client
          (fn [msg']
            (when-not (= ::none msg')
              (s/put! s msg')))
          ;; if we were successful in our response, recur and repeat
          (fn [result]
            (when result
              (d/recur))))
        ;; if there were any issues on the far end, send a stringified exception back
        ;; and close the connection
        (d/catch
          (fn [ex]
            (s/put! s (str "ERROR: " ex))
            (s/close! s)))))))

Alternately, we use manifold.deferred/let-flow to implement the composition of these asynchronous values. It is certainly more concise, but at the cost of being less explicit.

(defn slow-echo-handler
  [f]
  (fn [s info]
    (d/loop []
      (->
        (d/let-flow [msg (s/take! s ::none)]
          (when-not (= ::none msg)
            (d/let-flow [msg'   (d/future (f msg))
                         result (s/put! s msg')]
              (when result
                (d/recur)))))
        (d/catch
          (fn [ex]
            (s/put! s (str "ERROR: " ex))
            (s/close! s)))))))

demonstration

We start a server s which will return incremented numbers, slowly.

(def s
  (start-server
    (slow-echo-handler
      (fn [x]
        (Thread/sleep 1000)
        (inc x)))
    10000))
(def c @(client "localhost" 10000))
@(s/put! c 1)  ; => true
@(s/take! c)   ; => 2
(.close s)

end demonstration

 
(ns aleph.examples.udp
  (:require
    [manifold.deferred :as d]
    [manifold.stream :as s]
    [aleph.udp :as udp]
    [clojure.string :as str]
    [byte-streams :as bs]))

Full documentation for the aleph.udp namespace can be found here.

This example is a very pared-down version of statsd, which takes in UDP packets from the entire system, and does periodic rollups that are typically forwarded to Graphite.

(def server-port 10002)

This creates a socket without a port, which we can only use to send messages. We dereference this, since it will typically complete immediately.

(def client-socket @(udp/socket {}))

This encodes a message in the typical statsd format, which is two strings, metric and value, delimited by a colon.

(defn send-metric!
  [metric ^long value]
  (s/put! client-socket
    {:host "localhost"
     :port server-port
     ;; The UDP contents can be anything which byte-streams can coerce to a byte-array.  If
     ;; the combined length of the metric and value were to exceed 65536 bytes, this would
     ;; fail, and `send-metrics!` would return a deferred value that yields an error.
     :message (str metric ":" value)}))

This is the inverse operation of send-metrics!, taking the message, splitting it on the colon delimiter, and parsing the value.

(defn parse-statsd-packet
  [{:keys [message]}]
  (let [message        (bs/to-string message)
        [metric value] (str/split message #":")]
    [metric (Long/parseLong value)]))

An atomic operation which returns the previous value, and sets it to new-val.

(defn get-and-set!
  [a new-val]
  (let [old-val @a]
    (if (compare-and-set! a old-val new-val)
      old-val
      (recur a new-val))))
(defn start-statsd-server
  []
  (let [accumulator   (atom {})
        server-socket @(udp/socket {:port server-port})
        ;; Once a second, take all the values that have accumulated, `put!` them out, and
        ;; clear the accumulator.
        metric-stream (s/periodically 1000 #(get-and-set! accumulator {}))]
    ;; Listens on a socket, parses each incoming message, and increments the appropriate metric.
    (->> server-socket
      (s/map parse-statsd-packet)
      (s/consume
        (fn [[metric value]]
          (swap! accumulator update metric #(+ (or % 0) value)))))
    ;; If `metric-stream` is closed, close the associated socket.
    (s/on-drained metric-stream #(s/close! server-socket))
    metric-stream))
(def server (start-statsd-server))
(send-metric! "a" 1)
(send-metric! "b" 2)
@(s/take! server)     ; => {"a" 1, "b" 2}
(s/close! server)
 
(ns aleph.examples.websocket
  (:require
    [compojure.core :as compojure :refer [GET]]
    [ring.middleware.params :as params]
    [compojure.route :as route]
    [aleph.http :as http]
    [byte-streams :as bs]
    [manifold.stream :as s]
    [manifold.deferred :as d]
    [manifold.bus :as bus]
    [clojure.core.async :as a]))
(def non-websocket-request
  {:status 400
   :headers {"content-type" "application/text"}
   :body "Expected a websocket request."})

This handler sets up a websocket connection, and then proceeds to echo back every message it receives from the client. The value yielded by websocket-connection is a **duplex stream**, which represents communication to and from the client. Therefore, all we need to do in order to echo the messages is connect the stream to itself.

Since any request it gets may not be a valid handshake for a websocket request, we need to handle that case appropriately.

(defn echo-handler
  [req]
  (if-let [socket (try
                    @(http/websocket-connection req)
                    (catch Exception e
                      nil))]
    (s/connect socket socket)
    non-websocket-request))

The previous handler blocks until the websocket handshake completes, which unnecessarily takes up a thread. This accomplishes the same as above, but asynchronously.

(defn echo-handler
  [req]
  (-> (http/websocket-connection req)
    (d/chain
      (fn [socket]
        (s/connect socket socket)))
    (d/catch
      (fn [_]
        non-websocket-request))))

This is another asynchronous handler, but uses let-flow instead of chain to define the handler in a way that at least somewhat resembles the synchronous handler.

(defn echo-handler
  [req]
  (->
    (d/let-flow [socket (http/websocket-connection req)]
      (s/connect socket socket))
    (d/catch
      (fn [_]
        non-websocket-request))))

to represent all the different chat rooms, we use an event bus, which is simple implementation of the publish/subscribe model

(def chatrooms (bus/event-bus))
(defn chat-handler
  [req]
  (d/let-flow [conn (d/catch
                      (http/websocket-connection req)
                      (fn [_] nil))]
    (if-not conn
      ;; if it wasn't a valid websocket handshake, return an error
      non-websocket-request
      ;; otherwise, take the first two messages, which give us the chatroom and name
      (d/let-flow [room (s/take! conn)
                   name (s/take! conn)]
        ;; take all messages from the chatroom, and feed them to the client
        (s/connect
          (bus/subscribe chatrooms room)
          conn)
        ;; take all messages from the client, prepend the name, and publish it to the room
        (s/consume
          #(bus/publish! chatrooms room %)
          (->> conn
            (s/map #(str name ": " %))
            (s/buffer 100)))
        ;; Compojure expects some sort of HTTP response, so just give it `nil`
        nil))))
(def handler
  (params/wrap-params
    (compojure/routes
      (GET "/echo" [] echo-handler)
      (GET "/chat" [] chat-handler)
      (route/not-found "No such page."))))
(def s (http/start-server handler {:port 10000}))

Here we put! ten messages to the server, and read them back again

(let [conn @(http/websocket-client "ws://localhost:10000/echo")]

  (s/put-all! conn
    (->> 10 range (map str)))

  (->> conn
    (s/transform (take 10))
    s/stream->seq
    doall))    ;=> ("0" "1" "2" "3" "4" "5" "6" "7" "8" "9")

=> ("0" "1" "2" "3" "4" "5" "6" "7" "8" "9")

Here we create two clients, and have them speak to each other

(let [conn1 @(http/websocket-client "ws://localhost:10000/chat")
      conn2 @(http/websocket-client "ws://localhost:10000/chat")
      ]

  ;; sign our two users in
  (s/put-all! conn1 ["shoes and ships" "Alice"])
  (s/put-all! conn2 ["shoes and ships" "Bob"])

  (s/put! conn1 "hello")

  @(s/take! conn1)   ;=> "Alice: hello"
  @(s/take! conn2)   ;=> "Alice: hello"

  (s/put! conn2 "hi!")

  @(s/take! conn1)   ;=> "Bob: hi!"
  @(s/take! conn2)   ;=> "Bob: hi!"
  )
(.close s)
 
(ns aleph.examples.http
  (:import
    [io.netty.handler.ssl SslContextBuilder])
  (:require
    [compojure.core :as compojure :refer [GET]]
    [ring.middleware.params :as params]
    [compojure.route :as route]
    [compojure.response :refer [Renderable]]
    [aleph.http :as http]
    [byte-streams :as bs]
    [manifold.stream :as s]
    [manifold.deferred :as d]
    [clojure.core.async :as a]
    [clojure.java.io :refer [file]]))

For HTTP, Aleph implements a superset of the Ring spec, which means it can be used as a drop-in replacement for pretty much any other Clojure webserver. In order to allow for asynchronous responses, however, it allows for the use of Manifold deferreds and streams. Uses of both will be illustrated below.

Complete documentation for the aleph.http namespace can be found here.

building servers

A basic Ring handler which immediately returns 'hello world'

(defn hello-world-handler
  [req]
  {:status 200
   :headers {"content-type" "text/plain"}
   :body "hello world!"})

A non-standard response handler that returns a deferred which yields a Ring response after one second. In a typical Ring-compliant server, this would require holding onto a thread via Thread.sleep() or a similar mechanism, but the use of a deferred allows for the thread to be immediately released without an immediate response.

This is an atypical usage of manifold.deferred/timeout!, which puts a 'timeout value' into a deferred after an interval if it hasn't already been realized. Here there's nothing else trying to touch the deferred, so it will simply yield the 'hello world' response after 1000 milliseconds.

(defn delayed-hello-world-handler
  [req]
  (d/timeout!
    (d/deferred)
    1000
    (hello-world-handler req)))

Compojure will normally dereference deferreds and return the realized value. Unfortunately, this blocks the thread. Since Aleph can accept the unrealized deferred, we extend Compojure's Renderable protocol to pass the deferred through unchanged so it can be handled asynchronously.

(extend-protocol Renderable
  clojure.lang.IDeref
  (render [d _] (d/->deferred d)))

Alternately, we can use a core.async goroutine to create our response, and convert the channel it returns using manifold.deferred/->source, and then take the first message from it. This is entirely equivalent to the previous implementation.

(defn delayed-hello-world-handler
  [req]
  (s/take!
    (s/->source
      (a/go
        (let [_ (a/<! (a/timeout 1000))]
          (hello-world-handler req))))))

Returns a streamed HTTP response, consisting of newline-delimited numbers every 100 milliseconds. While this would typically be represented by a lazy sequence, instead we use a Manifold stream. Similar to the use of the deferred above, this means we don't need to allocate a thread per-request.

In this handler we're assuming the string value for count is a valid number. If not, Integer.parseInt() will throw an exception, and we'll return a 500 status response with the stack trace. If we wanted to be more precise in our status, we'd wrap the parsing code with a try/catch that returns a 400 status when the count is malformed.

manifold.stream/periodically is similar to Clojure's repeatedly, except that it emits the value returned by the function at a fixed interval.

(defn streaming-numbers-handler
  [{:keys [params]}]
  (let [cnt (Integer/parseInt (get params "count" "0"))]
    {:status 200
     :headers {"content-type" "text/plain"}
     :body (let [sent (atom 0)]
             (->> (s/periodically 100 #(str (swap! sent inc) "\n"))
               (s/transform (take cnt))))}))

However, we can always still use lazy sequences. This is still useful when the upstream data provider exposes the stream of data as an Iterator or a similar blocking mechanism. This will, however, hold onto a thread until the sequence is exhausted.

(defn streaming-numbers-handler
  [{:keys [params]}]
  (let [cnt (Integer/parseInt (get params "count" "0"))]
    {:status 200
     :headers {"content-type" "text/plain"}
     :body (->> (range cnt)
             (map #(do (Thread/sleep 100) %))
             (map #(str % "\n")))}))

We can also take a core.async channel, coerce it to a Manifold source via manifold.stream/->source. All three implementations of streaming-numbers-handler are equivalent.

(defn streaming-numbers-handler
  [{:keys [params]}]
  (let [cnt (Integer/parseInt (get params "count" "0"))
        body (a/chan)]
    ;; create a goroutine that emits incrementing numbers once every 100 milliseconds
    (a/go-loop [i 0]
      (if (< i cnt)
        (let [_ (a/<! (a/timeout 100))]
          (a/>! body (str i "\n"))
          (recur (inc i)))
        (a/close! body)))
    ;; return a response containing the coerced channel as the body
    {:status 200
     :headers {"content-type" "text/plain"}
     :body (s/->source body)}))

This handler defines a set of endpoints via Compojure's routes macro. Notice that above we've added the GET macro via :refer so it doesn't have to be qualified. We wrap the result in ring.middleware.params/wrap-params so that we can get the count parameter in streaming-numbers-handler.

Notice that at the bottom we define a default compojure.route/not-found handler, which will return a 404 status. If we don't do this, a call to a URI we don't recognize will return a nil response, which will cause Aleph to log an error.

(def handler
  (params/wrap-params
    (compojure/routes
      (GET "/hello"         [] hello-world-handler)
      (GET "/delayed_hello" [] delayed-hello-world-handler)
      (GET "/numbers"       [] streaming-numbers-handler)
      (route/not-found "No such page."))))
(def s (http/start-server handler {:port 10000}))

using the http client

Here we immediately dereference the response, get the :body, which is an InputStream, and coerce it to a string using byte-strings/to-string.

(-> @(http/get "http://localhost:10000/hello")
  :body
  bs/to-string)   ;=> "hello world!"

=> "hello world!"

And we do the same exact thing for the delayed_hello endpoint, which returns an identical result after a second's pause.

(-> @(http/get "http://localhost:10000/delayed_hello")
  :body
  bs/to-string)   ;=> (beat) "hello world!"

=> (beat) "hello world!"

Instead of dereferencing the response, we can use manifold.deferred/chain to compose operations over it. Here we dereference the final result so that we don't close the server before the response is complete, but we could also perform some side effect further down the chain if we want to completely avoid blocking operations.

@(d/chain (http/get "http://localhost:10000/delayed_hello")
   :body
   bs/to-string) ;=> (beat) "hello world!"

=> (beat) "hello world!"

Here we take a line-delimited streaming response, and coerce it to a lazy sequence of strings via byte-streams/to-line-seq.

(->> @(http/get "http://localhost:10000/numbers"
        {:query-params {:count 10}})
  :body
  bs/to-line-seq
  (map #(Integer/parseInt %))
  doall)   ;=> (0 1 2 3 4 5 6 7 8 9)

=> (0 1 2 3 4 5 6 7 8 9)

By default, the :body of any response is a java.io.InputStream. However, this means that our consumption of the body needs to be synchronous, as shown above by coercing it to a Clojure seq. If we want to have the body be asynchronous, we need to specify :raw-stream? to be true for request connection pool.

(def raw-stream-connection-pool (http/connection-pool {:connection-options {:raw-stream? true}}))
@(d/chain
   (http/get "http://localhost:10000/numbers"
     {:query-params {:count 10}
      :pool raw-stream-connection-pool})
   :body
   #(s/map bs/to-byte-array %)
   #(s/reduce conj [] %)
   bs/to-string)

In the above example, we coerce a stream of Netty ByteBuf objects into a stream of byte[] before asynchronously accumulating them and coercing the entire collection into a String once the stream is exhausted.

This is a useful indirection, since ByteBuf objects are reference counted, and generally add a lot of complexity to the code in all but the simplest use cases. Coercing the ByteBuf objects to any other form (byte[], String, etc.) will copy the bytes and decrement the ref-count, which is almost always what's wanted.

However, if we're simply forwarding the bytes to another network socket, or want to minimize memory copies at all costs, leaving the ByteBuf objects in their native form is the way to go.

(.close s)

TLS client certificate authentication

Aleph also supports TLS client certificate authentication. To make such connections, we must build a custom SSL context and pass it to a connection pool that we'll use to make HTTP requests.

Given the certificate authority (ca)'s certificate, a client certificate, and the key for the client certificate in PKCS#8 format, we can build an SSL context for mutual TLS authentication.

(defn build-ssl-context
  [ca cert key]
  (-> (SslContextBuilder/forClient)
      (.trustManager (file ca))
      (.keyManager (file cert) (file key))
      .build))

To use the SSL context, we set the :ssl-context connection option on a connection pool. This allows the pool to make TLS connections with our client certificate.

(defn build-ssl-connection-pool
  [ca cert key]
  (http/connection-pool
   {:connection-options
    {:ssl-context (build-ssl-context ca cert key)}}))
(def ssl-connection-pool
  (build-ssl-connection-pool "path/to/ca.crt" "path/to/cert.crt" "path/to/key.k8"))

We can use our ssl-connection-pool builder to GET pages from our target endpoint by passing the :pool option to aleph.http/get.

@(d/chain
  (http/get
   "https://server.with.tls.client.auth"
   {:pool ssl-connection-pool})
  :body
  bs/to-string)