(ns aleph.examples.tcp (:require [manifold.deferred :as d] [ :as s] [clojure.edn :as edn] [aleph.tcp :as tcp] [gloss.core :as gloss] [ :as io])) | ||||||||||||||||
Complete documentation for the | ||||||||||||||||
the basics | ||||||||||||||||
This uses Gloss, which is a library for defining byte formats, which are automatically compiled into encoder and streaming decoders. Here, we define a simple protocol where each frame starts with a 32-bit integer describing
the length of the string which follows. We assume the string is EDN-encoded, and so we
define a | (def protocol (gloss/compile-frame (gloss/finite-frame :uint32 (gloss/string :utf-8)) pr-str edn/read-string)) | |||||||||||||||
This function takes a raw TCP duplex stream which represents bidirectional communication
via a single stream. Messages from the remote endpoint can be consumed via First, we define a connection between Then, we | (defn wrap-duplex-stream [protocol s] (let [out (s/stream)] (s/connect (s/map #(io/encode protocol %) out) s) (s/splice out (io/decode-stream s protocol)))) | |||||||||||||||
The call to | (defn client [host port] (d/chain (tcp/client {:host host, :port port}) #(wrap-duplex-stream protocol %))) | |||||||||||||||
Takes a two-argument | (defn start-server [handler port] (tcp/start-server (fn [s info] (handler (wrap-duplex-stream protocol s) info)) {:port port})) | |||||||||||||||
echo servers | ||||||||||||||||
This creates a handler which will apply | (defn fast-echo-handler [f] (fn [s info] (s/connect (s/map f s) s))) | |||||||||||||||
demonstration | ||||||||||||||||
We start a server | (def s (start-server (fast-echo-handler inc) 10000)) | |||||||||||||||
We connect a client to the server, dereferencing the deferred value returned such that | (def c @(client "localhost" 10000)) | |||||||||||||||
We | @(s/put! c 1) ; => true | |||||||||||||||
The message is parsed by the server, and the response is sent, which again may be split
while in transit between the server and client. The bytes are consumed and parsed by
| @(s/take! c) ; => 2 | |||||||||||||||
The server implements | (.close s) | |||||||||||||||
end demonstration | ||||||||||||||||
While we can do trivial computation on the same thread we receive messages, longer computation
or blocking operations should be done elsewhere. To accomplish this, we need something a
little more complicated than Here, we define an asynchronous loop via | (defn slow-echo-handler [f] (fn [s info] (d/loop [] ;; take a message, and define a default value that tells us if the connection is closed (-> (s/take! s ::none) (d/chain ;; first, check if there even was a message, and then transform it on another thread (fn [msg] (if (= ::none msg) ::none (d/future (f msg)))) ;; once the transformation is complete, write it back to the client (fn [msg'] (when-not (= ::none msg') (s/put! s msg'))) ;; if we were successful in our response, recur and repeat (fn [result] (when result (d/recur)))) ;; if there were any issues on the far end, send a stringified exception back ;; and close the connection (d/catch (fn [ex] (s/put! s (str "ERROR: " ex)) (s/close! s))))))) | |||||||||||||||
Alternately, we use | (defn slow-echo-handler [f] (fn [s info] (d/loop [] (-> (d/let-flow [msg (s/take! s ::none)] (when-not (= ::none msg) (d/let-flow [msg' (d/future (f msg)) result (s/put! s msg')] (when result (d/recur))))) (d/catch (fn [ex] (s/put! s (str "ERROR: " ex)) (s/close! s))))))) | |||||||||||||||
demonstration | ||||||||||||||||
We start a server | (def s (start-server (slow-echo-handler (fn [x] (Thread/sleep 1000) (inc x))) 10000)) | |||||||||||||||
(def c @(client "localhost" 10000)) | ||||||||||||||||
@(s/put! c 1) ; => true | ||||||||||||||||
@(s/take! c) ; => 2 | ||||||||||||||||
(.close s) | ||||||||||||||||
end demonstration | ||||||||||||||||
(ns aleph.examples.udp (:require [manifold.deferred :as d] [ :as s] [aleph.udp :as udp] [clojure.string :as str] [byte-streams :as bs])) | ||||||||||||||||
Full documentation for the | ||||||||||||||||
This example is a very pared-down version of statsd, which takes in UDP packets from the entire system, and does periodic rollups that are typically forwarded to Graphite. | ||||||||||||||||
(def server-port 10002) | ||||||||||||||||
This creates a socket without a port, which we can only use to send messages. We dereference this, since it will typically complete immediately. | (def client-socket @(udp/socket {})) | |||||||||||||||
This encodes a message in the typical statsd format, which is two strings, | (defn send-metric! [metric ^long value] (s/put! client-socket {:host "localhost" :port server-port ;; The UDP contents can be anything which byte-streams can coerce to a byte-array. If ;; the combined length of the metric and value were to exceed 65536 bytes, this would ;; fail, and `send-metrics!` would return a deferred value that yields an error. :message (str metric ":" value)})) | |||||||||||||||
This is the inverse operation of | (defn parse-statsd-packet [{:keys [message]}] (let [message (bs/to-string message) [metric value] (str/split message #":")] [metric (Long/parseLong value)])) | |||||||||||||||
An atomic operation which returns the previous value, and sets it to | (defn get-and-set! [a new-val] (let [old-val @a] (if (compare-and-set! a old-val new-val) old-val (recur a new-val)))) | |||||||||||||||
(defn start-statsd-server [] (let [accumulator (atom {}) server-socket @(udp/socket {:port server-port}) ;; Once a second, take all the values that have accumulated, `put!` them out, and ;; clear the accumulator. metric-stream (s/periodically 1000 #(get-and-set! accumulator {}))] ;; Listens on a socket, parses each incoming message, and increments the appropriate metric. (->> server-socket (s/map parse-statsd-packet) (s/consume (fn [[metric value]] (swap! accumulator update metric #(+ (or % 0) value))))) ;; If `metric-stream` is closed, close the associated socket. (s/on-drained metric-stream #(s/close! server-socket)) metric-stream)) | ||||||||||||||||
(def server (start-statsd-server)) | ||||||||||||||||
(send-metric! "a" 1) (send-metric! "b" 2) | ||||||||||||||||
@(s/take! server) ; => {"a" 1, "b" 2} | ||||||||||||||||
(s/close! server) | ||||||||||||||||
(ns aleph.examples.websocket (:require [compojure.core :as compojure :refer [GET]] [ring.middleware.params :as params] [compojure.route :as route] [aleph.http :as http] [byte-streams :as bs] [ :as s] [manifold.deferred :as d] [manifold.bus :as bus] [clojure.core.async :as a])) | ||||||||||||||||
(def non-websocket-request {:status 400 :headers {"content-type" "application/text"} :body "Expected a websocket request."}) | ||||||||||||||||
This handler sets up a websocket connection, and then proceeds to echo back every message
it receives from the client. The value yielded by Since any request it gets may not be a valid handshake for a websocket request, we need to handle that case appropriately. | (defn echo-handler [req] (if-let [socket (try @(http/websocket-connection req) (catch Exception e nil))] (s/connect socket socket) non-websocket-request)) | |||||||||||||||
The previous handler blocks until the websocket handshake completes, which unnecessarily takes up a thread. This accomplishes the same as above, but asynchronously. | (defn echo-handler [req] (-> (http/websocket-connection req) (d/chain (fn [socket] (s/connect socket socket))) (d/catch (fn [_] non-websocket-request)))) | |||||||||||||||
This is another asynchronous handler, but uses | (defn echo-handler [req] (-> (d/let-flow [socket (http/websocket-connection req)] (s/connect socket socket)) (d/catch (fn [_] non-websocket-request)))) | |||||||||||||||
to represent all the different chat rooms, we use an event bus, which is simple implementation of the publish/subscribe model | (def chatrooms (bus/event-bus)) | |||||||||||||||
(defn chat-handler [req] (d/let-flow [conn (d/catch (http/websocket-connection req) (fn [_] nil))] (if-not conn ;; if it wasn't a valid websocket handshake, return an error non-websocket-request ;; otherwise, take the first two messages, which give us the chatroom and name (d/let-flow [room (s/take! conn) name (s/take! conn)] ;; take all messages from the chatroom, and feed them to the client (s/connect (bus/subscribe chatrooms room) conn) ;; take all messages from the client, prepend the name, and publish it to the room (s/consume #(bus/publish! chatrooms room %) (->> conn (s/map #(str name ": " %)) (s/buffer 100))) ;; Compojure expects some sort of HTTP response, so just give it `nil` nil)))) | ||||||||||||||||
(def handler (params/wrap-params (compojure/routes (GET "/echo" [] echo-handler) (GET "/chat" [] chat-handler) (route/not-found "No such page.")))) | ||||||||||||||||
(def s (http/start-server handler {:port 10000})) | ||||||||||||||||
Here we | (let [conn @(http/websocket-client "ws://localhost:10000/echo")] (s/put-all! conn (->> 10 range (map str))) (->> conn (s/transform (take 10)) s/stream->seq doall)) ;=> ("0" "1" "2" "3" "4" "5" "6" "7" "8" "9") | |||||||||||||||
=> ("0" "1" "2" "3" "4" "5" "6" "7" "8" "9") | ||||||||||||||||
Here we create two clients, and have them speak to each other | (let [conn1 @(http/websocket-client "ws://localhost:10000/chat") conn2 @(http/websocket-client "ws://localhost:10000/chat") ] ;; sign our two users in (s/put-all! conn1 ["shoes and ships" "Alice"]) (s/put-all! conn2 ["shoes and ships" "Bob"]) (s/put! conn1 "hello") @(s/take! conn1) ;=> "Alice: hello" @(s/take! conn2) ;=> "Alice: hello" (s/put! conn2 "hi!") @(s/take! conn1) ;=> "Bob: hi!" @(s/take! conn2) ;=> "Bob: hi!" ) | |||||||||||||||
(.close s) | ||||||||||||||||
(ns aleph.examples.http (:import [io.netty.handler.ssl SslContextBuilder]) (:require [compojure.core :as compojure :refer [GET]] [ring.middleware.params :as params] [compojure.route :as route] [compojure.response :refer [Renderable]] [aleph.http :as http] [byte-streams :as bs] [ :as s] [manifold.deferred :as d] [clojure.core.async :as a] [ :refer [file]])) | ||||||||||||||||
For HTTP, Aleph implements a superset of the Ring spec, which means it can be used as a drop-in replacement for pretty much any other Clojure webserver. In order to allow for asynchronous responses, however, it allows for the use of Manifold deferreds and streams. Uses of both will be illustrated below. | ||||||||||||||||
Complete documentation for the | ||||||||||||||||
building servers | ||||||||||||||||
A basic Ring handler which immediately returns 'hello world' | (defn hello-world-handler [req] {:status 200 :headers {"content-type" "text/plain"} :body "hello world!"}) | |||||||||||||||
A non-standard response handler that returns a deferred which yields a Ring response
after one second. In a typical Ring-compliant server, this would require holding onto a
thread via This is an atypical usage of | (defn delayed-hello-world-handler [req] (d/timeout! (d/deferred) 1000 (hello-world-handler req))) | |||||||||||||||
Compojure will normally dereference deferreds and return the realized value.
Unfortunately, this blocks the thread. Since Aleph can accept the unrealized
deferred, we extend Compojure's | (extend-protocol Renderable clojure.lang.IDeref (render [d _] (d/->deferred d))) | |||||||||||||||
Alternately, we can use a core.async goroutine to
create our response, and convert the channel it returns using
| (defn delayed-hello-world-handler [req] (s/take! (s/->source (a/go (let [_ (a/<! (a/timeout 1000))] (hello-world-handler req)))))) | |||||||||||||||
Returns a streamed HTTP response, consisting of newline-delimited numbers every 100 milliseconds. While this would typically be represented by a lazy sequence, instead we use a Manifold stream. Similar to the use of the deferred above, this means we don't need to allocate a thread per-request. In this handler we're assuming the string value for | (defn streaming-numbers-handler [{:keys [params]}] (let [cnt (Integer/parseInt (get params "count" "0"))] {:status 200 :headers {"content-type" "text/plain"} :body (let [sent (atom 0)] (->> (s/periodically 100 #(str (swap! sent inc) "\n")) (s/transform (take cnt))))})) | |||||||||||||||
However, we can always still use lazy sequences. This is still useful when the upstream
data provider exposes the stream of data as an | (defn streaming-numbers-handler [{:keys [params]}] (let [cnt (Integer/parseInt (get params "count" "0"))] {:status 200 :headers {"content-type" "text/plain"} :body (->> (range cnt) (map #(do (Thread/sleep 100) %)) (map #(str % "\n")))})) | |||||||||||||||
We can also take a core.async channel, coerce it to a Manifold source via
| (defn streaming-numbers-handler [{:keys [params]}] (let [cnt (Integer/parseInt (get params "count" "0")) body (a/chan)] ;; create a goroutine that emits incrementing numbers once every 100 milliseconds (a/go-loop [i 0] (if (< i cnt) (let [_ (a/<! (a/timeout 100))] (a/>! body (str i "\n")) (recur (inc i))) (a/close! body))) ;; return a response containing the coerced channel as the body {:status 200 :headers {"content-type" "text/plain"} :body (s/->source body)})) | |||||||||||||||
This handler defines a set of endpoints via Compojure's Notice that at the bottom we define a default | (def handler (params/wrap-params (compojure/routes (GET "/hello" [] hello-world-handler) (GET "/delayed_hello" [] delayed-hello-world-handler) (GET "/numbers" [] streaming-numbers-handler) (route/not-found "No such page.")))) | |||||||||||||||
(def s (http/start-server handler {:port 10000})) | ||||||||||||||||
using the http client | ||||||||||||||||
Here we immediately dereference the response, get the | (-> @(http/get "http://localhost:10000/hello") :body bs/to-string) ;=> "hello world!" | |||||||||||||||
=> "hello world!" | ||||||||||||||||
And we do the same exact thing for the | (-> @(http/get "http://localhost:10000/delayed_hello") :body bs/to-string) ;=> (beat) "hello world!" | |||||||||||||||
=> (beat) "hello world!" | ||||||||||||||||
Instead of dereferencing the response, we can use | @(d/chain (http/get "http://localhost:10000/delayed_hello") :body bs/to-string) ;=> (beat) "hello world!" | |||||||||||||||
=> (beat) "hello world!" | ||||||||||||||||
Here we take a line-delimited streaming response, and coerce it to a lazy sequence of
strings via | (->> @(http/get "http://localhost:10000/numbers" {:query-params {:count 10}}) :body bs/to-line-seq (map #(Integer/parseInt %)) doall) ;=> (0 1 2 3 4 5 6 7 8 9) | |||||||||||||||
=> (0 1 2 3 4 5 6 7 8 9) | ||||||||||||||||
By default, the | (def raw-stream-connection-pool (http/connection-pool {:connection-options {:raw-stream? true}})) | |||||||||||||||
@(d/chain (http/get "http://localhost:10000/numbers" {:query-params {:count 10} :pool raw-stream-connection-pool}) :body #(s/map bs/to-byte-array %) #(s/reduce conj [] %) bs/to-string) | ||||||||||||||||
In the above example, we coerce a stream of Netty This is a useful indirection, since However, if we're simply forwarding the bytes to another network socket, or want to minimize
memory copies at all costs, leaving the | ||||||||||||||||
(.close s) | ||||||||||||||||
TLS client certificate authentication | ||||||||||||||||
Aleph also supports TLS client certificate authentication. To make such connections, we must build a custom SSL context and pass it to a connection pool that we'll use to make HTTP requests. | ||||||||||||||||
Given the certificate authority (ca)'s certificate, a client certificate, and the key for the client certificate in PKCS#8 format, we can build an SSL context for mutual TLS authentication. | (defn build-ssl-context [ca cert key] (-> (SslContextBuilder/forClient) (.trustManager (file ca)) (.keyManager (file cert) (file key)) .build)) | |||||||||||||||
To use the SSL context, we set the | (defn build-ssl-connection-pool [ca cert key] (http/connection-pool {:connection-options {:ssl-context (build-ssl-context ca cert key)}})) | |||||||||||||||
(def ssl-connection-pool (build-ssl-connection-pool "path/to/ca.crt" "path/to/cert.crt" "path/to/key.k8")) | ||||||||||||||||
We can use our | @(d/chain (http/get "https://server.with.tls.client.auth" {:pool ssl-connection-pool}) :body bs/to-string) | |||||||||||||||