akka & spray

Actors, HTTP and Reactive Streams


Mathias Doenitz   / /

This presentation: http://spray.io/jax14/

Imagine this task

You are to build a business application

application application application application

This is your hardware

application application application


  • How to scale gradually?
    • vertically (up, across cores of one machine)
    • horizontally (out, across many machines)
  • How to react to
    • hardware failures?
    • software failures?
    • network failures?

How do you do it?

  • What language(s) do you use?
  • What tools/libraries do you use?
  • More generally:
    What programming paradigm?

The scenario is real!

35 years of CPU trends
CPU trends over 35 years
Source: Chuck Moore

The new opponent: Amdahl's Law

Amdahl's Law
Parallelisation is key!

Our old tools don't cut it

  • Threads (programmed directly)
    • high memory overhead
    • starting/stopping is expensive
    • inter-thread communication entirely left to the user
  • Locks/Mutexes/Semaphores/`synchronized`/`volatile`
    • too little sync: race conditions, wrong results
    • too much sync: deadlocks, poor performance
    • very hard to use correctly
  • We need something better!

The Core Problem

Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State


"A toolkit and runtime for building
highly concurrent, distributed, and fault-tolerant
event-driven applications on the JVM"

Source: http://akka.io

Akka's Promise

"Build powerful concurrent & distributed
applications more easily"

Source: http://akka.io

Core abstraction: Actor

  • a lightweight isolated "process"
  • contains state and "behavior"
  • communicates only via async & immutable messages
    (share nothing)
  • has a mailbox (message queue)
  • is supervised by its parent (for managing failure)
  • is location transparent (distributable)

An Actor in Scala

class CountingActor extends Actor {
  var counter = 0
  def receive = {
    case "ping"  ⇒ println("received ping")
    case "count" ⇒ counter += 1
    case "get"   ⇒ sender ! counter

Actors vs. Objects

  • An actor is an object, but
  • you can't peek inside it
  • you don't call methods (but send messages)
  • you don't get return values (but receive messages)
  • is internally thread-safe

Actor Benefits

  • "An island of sanity in a sea of concurrency"
    • processes one message at a time
    • runs purely sequential easy-to-understand logic
  • very lightweight (~400 bytes)
  • can be constructed and torn down very quickly
  • leaves scheduling complexities to runtime
  • promotes high-granularity modulization

A different programming paradigm

Akka: more than just actors

  • Scala- and Java APIs across the board
  • akka-actor: actors
  • akka-cluster: fault-tolerant, decentralized
    peer-to-peer cluster membership service
  • akka-io: low-level network IO (TCP and UDP)
  • akka-persistence: event-sourcing
  • akka-http: HTTP/REST (soon, today: http://spray.io)

What is spray?

  • embeddable HTTP stack for your
    Akka (Scala) applications
  • focus: HTTP integration layers
    rather than web applications
  • server- and client-side

But, why?

Isn't HTTP on the JVM a "solved" problem?

Can't we just use Netty?
(or Servlets, or Restlet, or Undertow, ...)

Yes, we can

(it's being done all the time)

But: Do we want to?

Not really!

  • servlet containers?
  • XML configuration?
  • mutable data models / APIs?
  • Java Collections?
  • adapter layers?
  • limited type-safety?

What we want

  • `case class`-based model
  • actor-based APIs (message protocols)
  • functions as values
  • Scala/Akka Futures
  • Scala collections
  • type classes
  • type safety

What we also want

  • unified thread-pool mgmt. (Akka dispatchers)
  • unified configuration (Typesafe config)
  • unified logging (Akka event bus)
  • unified debugging / optimization
    (e.g. with Typesafe console)

We want to build on Akka!

  • our whole application that is,
    not just a few bits!
  • same principles, concepts and coding
    style in all layers of the stack:
    much easier problem analysis & tuning

spray builds on Akka

  • entirely built in Scala, no wrapping of Java libraries
  • fully async and non-blocking
  • only one type of active components in all layers: actors
  • core API style: message protocol
  • actor-friendly (e.g. "tell don't ask")
  • fast, lightweight, modular, testable

spray components


plus: spray-servlet, spray-testkit, ...

HTTP model

  • `case class`-based data model
  • high-level abstractions for most things HTTP
  • fully immutable, little logic
  • predefined instances for common media types, status codes, encodings, charsets, cache-control directives, etc.
  • open for extension
    (e.g. registration of custom media types)

HTTP model: show me code

case class HttpRequest(
  method: HttpMethod = HttpMethods.GET,
  uri: Uri = Uri./,
  headers: List[HttpHeader] = Nil,
  entity: HttpEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage

HTTP model: show me code

case class HttpResponse(
  status: StatusCode = StatusCodes.OK,
  entity: HttpEntity = HttpEntity.Empty,
  headers: List[HttpHeader] = Nil,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage

HTTP model: show me code

case class Uri(             // proper RFC 3986
  scheme: String,           // compliant,
  authority: Authority,     // immutable
  path: Path,               // URI model
  query: Query,             // with a fast,
  fragment: Option[String]) // custom parser

HTTP model: show me code

case class `Accept-Charset`(charsetRanges: Seq[HttpCharsetRange])
  extends HttpHeader

case class `Accept-Encoding`(encodings: Seq[HttpEncodingRange])
  extends HttpHeader

case class `Set-Cookie`(cookie: HttpCookie)
  extends HttpHeader

case class RawHeader(name: String, value: String)
  extends HttpHeader

Low-level HTTP layer

  • directly sits on top of Akka IO
  • performs TCP HTTP "translation"
  • cleanly separated layer of actors
    provided as an Akka Extension
  • implements "essentials",
    no higher-level features (like file serving)

The IO stack


Akka IO

  • bridges the gap between Java NIO and Akka actors
  • msg-based API surfaces the nature of the network
  • events come in, e.g. «connection established»,
    «bytes received», »connection closed», «error occured»
  • commands drive from our side, e.g.
    «attempt connecting»«send bytes»,«close connection»


  • provides message-based APIs on multiple levels
    (server-side: connection-level,
    client-side: connection-, host- and request-level)
  • maximum throughput with acceptable latency
  • massive numbers of concurrent connections
  • HTTP pipelining
  • chunked messages (streaming)
  • SSL/TLS encryption

spray-can: show me code

class PingPongService extends Actor {
  def receive = {
    // when a new connection comes in we register
    // ourselves as the connection handler
    case _: Http.Connected ⇒ sender ! Http.Register(self)

    // can you guess what this does?
    case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒
      sender ! HttpResponse(entity = "PONG")


  • internal DSL for the direct interface layer to the application
  • type-safe, yet flexible (thanks to shapeless)
  • much more than just routing: behavior definition
  • small and simple building blocks: directives
  • highly composable

API Layer: How it fits in

application API layer API layer API layer API layer API layer API layer

API Layer Responsibilities

  • request routing based on method, path, query, entity
  • (Un)marshalling to / from domain objects
  • encoding / decoding (compression)
  • authentication / authorization
  • caching and serving static content
  • RESTful error handling

API Layer: show me code

class MyServiceActor extends HttpServiceActor {
  def receive = runRoute {
    path("order" / HexIntNumber) { id =>
      get {
        complete {
          "Received GET request for order " + id
      } ~
      put {
        complete {
          "Received PUT request for order " + id

Predefined Directives

alwaysCache, anyParam, anyParams, authenticate, authorize, autoChunk, cache, cachingProhibited, cancelAllRejections, cancelRejection, clientIP, complete, compressResponse, compressResponseIfRequested, cookie, decodeRequest, decompressRequest, delete, deleteCookie, detach, dynamic, dynamicIf, encodeResponse, entity, extract, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValuePF, hextract, host, hostName, hprovide, jsonpWithParameter, listDirectoryContents, logRequest, logRequestResponse, logResponse, mapHttpResponse, mapHttpResponseEntity, mapHttpResponseHeaders, mapHttpResponsePart, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapRouteResponse, mapRouteResponsePF, method, noop, onComplete, onFailure, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValuePF, options, overrideMethodWithParameter, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathPrefix, pathPrefixTest, pathSuffix, pathSuffixTest, post, produce, provide, put, rawPath, rawPathPrefix, rawPathPrefixTest, redirect, reject, rejectEmptyResponse, requestEncodedWith, requestEntityEmpty, requestEntityPresent, respondWithHeader, respondWithHeaders, respondWithLastModifiedHeader, respondWithMediaType, respondWithSingletonHeader, respondWithSingletonHeaders, respondWithStatus, responseEncodingAccepted, rewriteUnmatchedPath, routeRouteResponse, scheme, schemeName, setCookie, unmatchedPath, validate

Real-World Example

lazy val route = {
  encodeResponse(Gzip) {
    path("") {
      get {
    } ~
    pathPrefix("api") {
      jsonpWithParameter("callback") {
        path("top-articles") {
          get {
            parameter("max".as[Int]) { max =>
              validate(max >= 0, "query parameter 'max' must be >= 0") {
                complete {
                  (topArticlesService ? max).mapTo[Seq[Article]]
        } ~
        tokenAuthenticate { user =>
          path("ranking") {
            get {
              countAndTime(user, "ranking") {
                parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
                           "data" ? 0).as(RankingDescriptor) { descr =>
                  complete {
                    (rankingService ? Ranking(descr)).mapTo[RankingResult]
          } ~
          path("accounts") {
            post {
              authorize(user.isAdmin) {
                content(as[AccountDetails]) { details =>
                  complete {
                    (accountService ? NewAccount(details)).mapTo[OpResult]
          } ~
          path("account" / IntNumber) { accountId =>
            get { ... } ~
            put { ... } ~
            delete { ... }
      } ~
      pathPrefix("v1") {
    } ~
    pathPrefix("doc") {
      respondWithHeader(`Cache-Control`(`max-age`(3600))) {
        transformResponse(_.withContentTransformed(markdown2Html)) {
                                   pathRewriter = appendFileExt)
    } ~
  } ~
  cacheIfEnabled {
    encodeResponse(Gzip) {

Best Practices

  • Keep route structure clean and readable,

    pull out all logic into custom directives
  • Don’t let API layer leak into application
  • Use (Un)marshalling infrastructure
  • Think about error handling/reporting
    right from the start
  • Use sbt-revolver for fast dev turn-around

There is more...

Current State

  • Latest release: spray 1.[0123].1
  • Currently: spray becomes akka-http
  • Play will gradually move onto akka-http
  • Then: Improvements, features, rounding off
    • websockets
    • HTTP 2.0
    • ...


  • Only two modules (down from 10)
    • akka-http-core
    • akka-http
  • No more spray-servlet (at least for now)
  • Java APIs
  • Main improvement:
    Everything now based on Reactive Streams

Towards Reactive Streams

  • What is a stream?
  • Ephemeral flow of data
  • Focused on describing transformation
  • Possibly unbounded in size

Common Uses of Streams

  • Bulk data transfer
  • Real-time data sources
  • Batch processing of large data sets
  • Monitoring and analytics
  • Scaling Business Logic

Example: Scaling Logic

scaling scaling scaling scaling scaling
Fine-grained Parallelisation is key!

Async Boundary

scaling scaling scaling scaling scaling scaling scaling scaling

Async Boundary

  • Data elements flow downstream
  • Demand flows upstream
  • Data items flow only when there is demand
  • Recipient is in control of incoming data rate
  • Data in flight is bounded by signaled demand

Dynamic Push/Pull

  • “Push” behavior when consumer is faster
  • “Pull” behavior when producer is faster
  • Switches automatically between these
  • Batching demand allows batching data

Async Boundaries are Everywhere

  • Between actors
  • Between threads
  • Between CPUs
  • Between network nodes
  • Between applications

The Reactive Streams Project

  • Participants: Engineers from
  • Netflix
  • Twitter
  • Red Hat
  • Pivotal
  • Typesafe
  • spray.io


  • An common, minimal standard interface
    for handing data across async boundaries
  • Complete freedom for many idiomatic APIs
  • Interoperability to make best use of efforts
  • You should be able to say:

Reactive Streams in akka-http

case class HttpIncomingConnection(
  remoteAddress: InetSocketAddress,
  requestProducer: Producer[HttpRequest],
  responseConsumer: Consumer[HttpResponse])

Reactive Streams in akka-http

case class HttpRequest(
  method: HttpMethod,
  uri: Uri,
  headers: List[HttpHeader],
  entity: Producer[ByteString],
  protocol: HttpProtocol)

Getting started


Q & A