dependenciesaleph |
| 0.4.6 | gloss |
| 0.2.6 | compojure |
| 1.6.1 | org.clojure/clojure |
| 1.9.0 | org.clojure/core.async |
| 0.4.474 |
|
(this space intentionally left almost blank) |
| |
| |
| ( ns aleph.examples.tcp
( :require
[ manifold.deferred :as d ]
[ manifold.stream :as s ]
[ clojure.edn :as edn ]
[ aleph.tcp :as tcp ]
[ gloss.core :as gloss ]
[ gloss.io :as io ] ) )
|
|
Complete documentation for the aleph.tcp namespace can be found here.
| |
the basics
| |
This uses Gloss, which is a library for defining byte
formats, which are automatically compiled into encoder and streaming decoders.
Here, we define a simple protocol where each frame starts with a 32-bit integer describing
the length of the string which follows. We assume the string is EDN-encoded, and so we
define a pre-encoder of pr-str , which will turn our arbitrary value into a string, and
a post-decoder of clojure.edn/read-string , which will transform our string into a data
structure.
| ( def protocol
( gloss/compile-frame
( gloss/finite-frame :uint32
( gloss/string :utf-8 ) )
pr-str
edn/read-string ) )
|
|
This function takes a raw TCP duplex stream which represents bidirectional communication
via a single stream. Messages from the remote endpoint can be consumed via take! , and
messages can be sent to the remote endpoint via put! . It returns a duplex stream which
will take and emit arbitrary Clojure data, via the protocol we've just defined.
First, we define a connection between out and the raw stream, which will take all the
messages from out and encode them before passing them onto the raw stream.
Then, we splice together a separate sink and source, so that they can be presented as a
single duplex stream. We've already defined our sink, which will encode all outgoing
messages. We must combine that with a decoded view of the incoming stream, which is
accomplished via gloss.io/decode-stream .
| ( defn wrap-duplex-stream
[ protocol s ]
( let [ out ( s/stream ) ]
( s/connect
( s/map #( io/encode protocol % ) out )
s )
( s/splice
out
( io/decode-stream s protocol ) ) ) )
|
|
The call to aleph.tcp/client returns a deferred, which will yield a duplex stream that
can be used to both send and receive bytes. We asynchronously compose over this value using
manifold.deferred/chain , which will wait for the client to be realized, and then pass
the client into wrap-duplex-stream . The call to chain will return immediately with a
deferred value representing the eventual wrapped stream.
| ( defn client
[ host port ]
( d/chain ( tcp/client { :host host , :port port } )
#( wrap-duplex-stream protocol % ) ) )
|
|
Takes a two-argument handler function, which takes a stream and information about the
connection, and sets up message handling for the stream. The raw stream is wrapped in the
Gloss protocol before being passed into handler .
| ( defn start-server
[ handler port ]
( tcp/start-server
( fn [ s info ]
( handler ( wrap-duplex-stream protocol s ) info ) )
{ :port port } ) )
|
|
echo servers
| |
This creates a handler which will apply f to any incoming message, and immediately
send back the result. Notice that we are connecting s to itself, but since it is a duplex
stream this is simply an easy way to create an echo server.
| ( defn fast-echo-handler
[ f ]
( fn [ s info ]
( s/connect
( s/map f s )
s ) ) )
|
|
demonstration
| |
We start a server s which will return incremented numbers.
| ( def s
( start-server
( fast-echo-handler inc )
10000 ) )
|
|
We connect a client to the server, dereferencing the deferred value returned such that c
is simply a duplex stream that takes and emits Clojure values.
| ( def c @ ( client "localhost" 10000 ) )
|
|
We put! a value into the stream, which is encoded to bytes and sent as a TCP packet. Since
TCP is a streaming protocol, it is not guaranteed to arrive as a single packet, so the server
must be robust to split messages. Since both client and server are using Gloss codecs, this
is automatic.
| |
The message is parsed by the server, and the response is sent, which again may be split
while in transit between the server and client. The bytes are consumed and parsed by
wrap-duplex-stream , and the decoded message can be received via take! .
| |
The server implements java.io.Closeable , and can be stopped by calling close() .
| |
end demonstration
| |
While we can do trivial computation on the same thread we receive messages, longer computation
or blocking operations should be done elsewhere. To accomplish this, we need something a
little more complicated than connect ing a stream to itself.
Here, we define an asynchronous loop via manifold.deferred/loop . In this loop, we take a
message from the stream, transform it on another thread with manifold.deferred/future ,
send it back, and then repeat.
| ( defn slow-echo-handler
[ f ]
( fn [ s info ]
( d/loop [ ]
( -> ( s/take! s ::none )
( d/chain
( fn [ msg ]
( if ( = ::none msg )
::none
( d/future ( f msg ) ) ) )
( fn [ msg' ]
( when-not ( = ::none msg' )
( s/put! s msg' ) ) )
( fn [ result ]
( when result
( d/recur ) ) ) )
( d/catch
( fn [ ex ]
( s/put! s ( str "ERROR: " ex ) )
( s/close! s ) ) ) ) ) ) )
|
|
Alternately, we use manifold.deferred/let-flow to implement the composition of these
asynchronous values. It is certainly more concise, but at the cost of being less explicit.
| ( defn slow-echo-handler
[ f ]
( fn [ s info ]
( d/loop [ ]
( ->
( d/let-flow [ msg ( s/take! s ::none ) ]
( when-not ( = ::none msg )
( d/let-flow [ msg' ( d/future ( f msg ) )
result ( s/put! s msg' ) ]
( when result
( d/recur ) ) ) ) )
( d/catch
( fn [ ex ]
( s/put! s ( str "ERROR: " ex ) )
( s/close! s ) ) ) ) ) ) )
|
|
demonstration
| |
We start a server s which will return incremented numbers, slowly.
| ( def s
( start-server
( slow-echo-handler
( fn [ x ]
( Thread/sleep 1000 )
( inc x ) ) )
10000 ) )
|
|
| ( def c @ ( client "localhost" 10000 ) )
|
|
| |
| |
| |
end demonstration
| |
| |
| |
| ( ns aleph.examples.udp
( :require
[ manifold.deferred :as d ]
[ manifold.stream :as s ]
[ aleph.udp :as udp ]
[ clojure.string :as str ]
[ byte-streams :as bs ] ) )
|
|
Full documentation for the aleph.udp namespace can be found here.
| |
This example is a very pared-down version of statsd, which
takes in UDP packets from the entire system, and does periodic rollups that are typically
forwarded to Graphite.
| |
| |
This creates a socket without a port, which we can only use to send messages. We dereference
this, since it will typically complete immediately.
| ( def client-socket @ ( udp/socket { } ) )
|
|
This encodes a message in the typical statsd format, which is two strings, metric and
value , delimited by a colon.
| ( defn send-metric!
[ metric ^ long value ]
( s/put! client-socket
{ :host "localhost"
:port server-port
:message ( str metric ":" value ) } ) )
|
|
This is the inverse operation of send-metrics! , taking the message, splitting it on the
colon delimiter, and parsing the value .
| ( defn parse-statsd-packet
[ { :keys [ message ] } ]
( let [ message ( bs/to-string message )
[ metric value ] ( str/split message #":" ) ]
[ metric ( Long/parseLong value ) ] ) )
|
|
An atomic operation which returns the previous value, and sets it to new-val .
| ( defn get-and-set!
[ a new-val ]
( let [ old-val @ a ]
( if ( compare-and-set! a old-val new-val )
old-val
( recur a new-val ) ) ) )
|
|
| ( defn start-statsd-server
[ ]
( let [ accumulator ( atom { } )
server-socket @ ( udp/socket { :port server-port } )
metric-stream ( s/periodically 1000 #( get-and-set! accumulator { } ) ) ]
( ->> server-socket
( s/map parse-statsd-packet )
( s/consume
( fn [ [ metric value ] ]
( swap! accumulator update metric #( + ( or % 0 ) value ) ) ) ) )
( s/on-drained metric-stream #( s/close! server-socket ) )
metric-stream ) )
|
|
| ( def server ( start-statsd-server ) )
|
|
| ( send-metric! "a" 1 )
( send-metric! "b" 2 )
|
|
| |
| |
| |
| |
| ( ns aleph.examples.websocket
( :require
[ compojure.core :as compojure :refer [ GET ] ]
[ ring.middleware.params :as params ]
[ compojure.route :as route ]
[ aleph.http :as http ]
[ byte-streams :as bs ]
[ manifold.stream :as s ]
[ manifold.deferred :as d ]
[ manifold.bus :as bus ]
[ clojure.core.async :as a ] ) )
|
|
| ( def non-websocket-request
{ :status 400
:headers { "content-type" "application/text" }
:body "Expected a websocket request." } )
|
|
This handler sets up a websocket connection, and then proceeds to echo back every message
it receives from the client. The value yielded by websocket-connection is a **duplex
stream**, which represents communication to and from the client. Therefore, all we need to
do in order to echo the messages is connect the stream to itself.
Since any request it gets may not be a valid handshake for a websocket request, we need to
handle that case appropriately.
| ( defn echo-handler
[ req ]
( if-let [ socket ( try
@ ( http/websocket-connection req )
( catch Exception e
nil ) ) ]
( s/connect socket socket )
non-websocket-request ) )
|
|
The previous handler blocks until the websocket handshake completes, which unnecessarily
takes up a thread. This accomplishes the same as above, but asynchronously.
| ( defn echo-handler
[ req ]
( -> ( http/websocket-connection req )
( d/chain
( fn [ socket ]
( s/connect socket socket ) ) )
( d/catch
( fn [ _ ]
non-websocket-request ) ) ) )
|
|
This is another asynchronous handler, but uses let-flow instead of chain to define the
handler in a way that at least somewhat resembles the synchronous handler.
| ( defn echo-handler
[ req ]
( ->
( d/let-flow [ socket ( http/websocket-connection req ) ]
( s/connect socket socket ) )
( d/catch
( fn [ _ ]
non-websocket-request ) ) ) )
|
|
to represent all the different chat rooms, we use an event bus, which is simple
implementation of the publish/subscribe model
| ( def chatrooms ( bus/event-bus ) )
|
|
| ( defn chat-handler
[ req ]
( d/let-flow [ conn ( d/catch
( http/websocket-connection req )
( fn [ _ ] nil ) ) ]
( if-not conn
non-websocket-request
( d/let-flow [ room ( s/take! conn )
name ( s/take! conn ) ]
( s/connect
( bus/subscribe chatrooms room )
conn )
( s/consume
#( bus/publish! chatrooms room % )
( ->> conn
( s/map #( str name ": " % ) )
( s/buffer 100 ) ) )
nil ) ) ) )
|
|
| ( def handler
( params/wrap-params
( compojure/routes
( GET "/echo" [ ] echo-handler )
( GET "/chat" [ ] chat-handler )
( route/not-found "No such page." ) ) ) )
|
|
| ( def s ( http/start-server handler { :port 10000 } ) )
|
|
Here we put! ten messages to the server, and read them back again
| ( s/put-all! conn
( ->> 10 range ( map str ) ) )
( ->> conn
( s/transform ( take 10 ) )
s/stream->seq
doall ) )
|
|
=> ("0" "1" "2" "3" "4" "5" "6" "7" "8" "9")
| |
Here we create two clients, and have them speak to each other
| ]
( s/put-all! conn1 [ "shoes and ships" "Alice" ] )
( s/put-all! conn2 [ "shoes and ships" "Bob" ] )
( s/put! conn1 "hello" )
@ ( s/take! conn1 )
@ ( s/take! conn2 )
( s/put! conn2 "hi!" )
@ ( s/take! conn1 )
@ ( s/take! conn2 )
)
|
|
| |
| |
| |
| ( ns aleph.examples.http
( :import
[ io.netty.handler.ssl SslContextBuilder ] )
( :require
[ compojure.core :as compojure :refer [ GET ] ]
[ ring.middleware.params :as params ]
[ compojure.route :as route ]
[ compojure.response :refer [ Renderable ] ]
[ aleph.http :as http ]
[ byte-streams :as bs ]
[ manifold.stream :as s ]
[ manifold.deferred :as d ]
[ clojure.core.async :as a ]
[ clojure.java.io :refer [ file ] ] ) )
|
|
For HTTP, Aleph implements a superset of the
Ring spec, which means it can be
used as a drop-in replacement for pretty much any other Clojure webserver. In order to
allow for asynchronous responses, however, it allows for the use of
Manifold deferreds and streams. Uses of both
will be illustrated below.
| |
Complete documentation for the aleph.http namespace can be found here.
| |
building servers
| |
A basic Ring handler which immediately returns 'hello world'
| ( defn hello-world-handler
[ req ]
{ :status 200
:headers { "content-type" "text/plain" }
:body "hello world!" } )
|
|
A non-standard response handler that returns a deferred which yields a Ring response
after one second. In a typical Ring-compliant server, this would require holding onto a
thread via Thread.sleep() or a similar mechanism, but the use of a deferred allows for
the thread to be immediately released without an immediate response.
This is an atypical usage of manifold.deferred/timeout! , which puts a 'timeout value'
into a deferred after an interval if it hasn't already been realized. Here there's nothing
else trying to touch the deferred, so it will simply yield the 'hello world' response after
1000 milliseconds.
| ( defn delayed-hello-world-handler
[ req ]
( d/timeout!
( d/deferred )
1000
( hello-world-handler req ) ) )
|
|
Compojure will normally dereference deferreds and return the realized value.
Unfortunately, this blocks the thread. Since Aleph can accept the unrealized
deferred, we extend Compojure's Renderable protocol to pass the deferred
through unchanged so it can be handled asynchronously.
| ( extend-protocol Renderable
clojure.lang.IDeref
( render [ d _ ] ( d/->deferred d ) ) )
|
|
Alternately, we can use a core.async goroutine to
create our response, and convert the channel it returns using
manifold.deferred/->source , and then take the first message from it. This is entirely equivalent
to the previous implementation.
| ( defn delayed-hello-world-handler
[ req ]
( s/take!
( s/->source
( a/go
( let [ _ ( a/<! ( a/timeout 1000 ) ) ]
( hello-world-handler req ) ) ) ) ) )
|
|
Returns a streamed HTTP response, consisting of newline-delimited numbers every 100
milliseconds. While this would typically be represented by a lazy sequence, instead we use
a Manifold stream. Similar to the use of the deferred above, this means we don't need
to allocate a thread per-request.
In this handler we're assuming the string value for count is a valid number. If not,
Integer.parseInt() will throw an exception, and we'll return a 500 status response
with the stack trace. If we wanted to be more precise in our status, we'd wrap the parsing
code with a try/catch that returns a 400 status when the count is malformed.
manifold.stream/periodically is similar to Clojure's repeatedly , except that it emits
the value returned by the function at a fixed interval.
| ( defn streaming-numbers-handler
[ { :keys [ params ] } ]
( let [ cnt ( Integer/parseInt ( get params "count" "0" ) ) ]
{ :status 200
:headers { "content-type" "text/plain" }
:body ( let [ sent ( atom 0 ) ]
( ->> ( s/periodically 100 #( str ( swap! sent inc ) "\n" ) )
( s/transform ( take cnt ) ) ) ) } ) )
|
|
However, we can always still use lazy sequences. This is still useful when the upstream
data provider exposes the stream of data as an Iterator or a similar blocking mechanism.
This will, however, hold onto a thread until the sequence is exhausted.
| ( defn streaming-numbers-handler
[ { :keys [ params ] } ]
( let [ cnt ( Integer/parseInt ( get params "count" "0" ) ) ]
{ :status 200
:headers { "content-type" "text/plain" }
:body ( ->> ( range cnt )
( map #( do ( Thread/sleep 100 ) % ) )
( map #( str % "\n" ) ) ) } ) )
|
|
We can also take a core.async channel, coerce it to a Manifold source via
manifold.stream/->source . All three implementations of streaming-numbers-handler are
equivalent.
| ( defn streaming-numbers-handler
[ { :keys [ params ] } ]
( let [ cnt ( Integer/parseInt ( get params "count" "0" ) )
body ( a/chan ) ]
( a/go-loop [ i 0 ]
( if ( < i cnt )
( let [ _ ( a/<! ( a/timeout 100 ) ) ]
( a/>! body ( str i "\n" ) )
( recur ( inc i ) ) )
( a/close! body ) ) )
{ :status 200
:headers { "content-type" "text/plain" }
:body ( s/->source body ) } ) )
|
|
This handler defines a set of endpoints via Compojure's routes macro. Notice that above
we've added the GET macro via :refer so it doesn't have to be qualified. We wrap the
result in ring.middleware.params/wrap-params so that we can get the count parameter in
streaming-numbers-handler .
Notice that at the bottom we define a default compojure.route/not-found handler, which
will return a 404 status. If we don't do this, a call to a URI we don't recognize will
return a nil response, which will cause Aleph to log an error.
| ( def handler
( params/wrap-params
( compojure/routes
( GET "/hello" [ ] hello-world-handler )
( GET "/delayed_hello" [ ] delayed-hello-world-handler )
( GET "/numbers" [ ] streaming-numbers-handler )
( route/not-found "No such page." ) ) ) )
|
|
| ( def s ( http/start-server handler { :port 10000 } ) )
|
|
using the http client
| |
Here we immediately dereference the response, get the :body , which is an InputStream,
and coerce it to a string using byte-strings/to-string .
| |
=> "hello world!"
| |
And we do the same exact thing for the delayed_hello endpoint, which returns an identical
result after a second's pause.
| |
=> (beat) "hello world!"
| |
Instead of dereferencing the response, we can use manifold.deferred/chain to compose
operations over it. Here we dereference the final result so that we don't close the server
before the response is complete, but we could also perform some side effect further down
the chain if we want to completely avoid blocking operations.
| |
=> (beat) "hello world!"
| |
Here we take a line-delimited streaming response, and coerce it to a lazy sequence of
strings via byte-streams/to-line-seq .
| { :query-params { :count 10 } } )
:body
bs/to-line-seq
( map #( Integer/parseInt % ) )
doall )
|
|
=> (0 1 2 3 4 5 6 7 8 9)
| |
By default, the :body of any response is a java.io.InputStream . However, this means
that our consumption of the body needs to be synchronous, as shown above by coercing it
to a Clojure seq. If we want to have the body be asynchronous, we need to specify
:raw-stream? to be true for request connection pool.
| ( def raw-stream-connection-pool ( http/connection-pool { :connection-options { :raw-stream? true } } ) )
|
|
| @ ( d/chain
{ :query-params { :count 10 }
:pool raw-stream-connection-pool } )
:body
#( s/map bs/to-byte-array % )
#( s/reduce conj [ ] % )
bs/to-string )
|
|
In the above example, we coerce a stream of Netty ByteBuf objects into a stream of byte[]
before asynchronously accumulating them and coercing the entire collection into a String
once the stream is exhausted.
This is a useful indirection, since ByteBuf objects are
reference counted, and generally
add a lot of complexity to the code in all but the simplest use cases. Coercing the ByteBuf
objects to any other form (byte[] , String , etc.) will copy the bytes and decrement
the ref-count, which is almost always what's wanted.
However, if we're simply forwarding the bytes to another network socket, or want to minimize
memory copies at all costs, leaving the ByteBuf objects in their native form is the way to
go.
| |
| |
TLS client certificate authentication
| |
Aleph also supports TLS client certificate authentication. To make such connections, we must
build a custom SSL context and pass it to a connection pool that we'll use to make HTTP requests.
| |
Given the certificate authority (ca)'s certificate, a client certificate, and the key for the
client certificate in PKCS#8 format, we can build an SSL context for mutual TLS authentication.
| ( defn build-ssl-context
[ ca cert key ]
( -> ( SslContextBuilder/forClient )
( .trustManager ( file ca ) )
( .keyManager ( file cert ) ( file key ) )
.build ) )
|
|
To use the SSL context, we set the :ssl-context connection option on a connection pool. This
allows the pool to make TLS connections with our client certificate.
| ( defn build-ssl-connection-pool
[ ca cert key ]
( http/connection-pool
{ :connection-options
{ :ssl-context ( build-ssl-context ca cert key ) } } ) )
|
|
| ( def ssl-connection-pool
( build-ssl-connection-pool "path/to/ca.crt" "path/to/cert.crt" "path/to/key.k8" ) )
|
|
We can use our ssl-connection-pool builder to GET pages from our target endpoint by passing the
:pool option to aleph.http/get .
| @ ( d/chain
( http/get
{ :pool ssl-connection-pool } )
:body
bs/to-string )
|
|
| |