akka & spray

Actors, IO, HTTP & Reactive Streams

2014-04-29 Zurich Scala Enthusiasts Meetup

Mathias Doenitz   / /

This presentation: http://spray.io/zse/

Imagine this task

You are to build a business application

application application application application

This is your hardware

application application application

Challenges

  • How to scale gradually?
    • vertically (up, across cores of one machine)
    • horizontally (out, across many machines)
  • How to react to
    • hardware failures?
    • software failures?
    • network failures?

How do you do it?

  • What language(s) do you use?
  • What tools/libraries do you use?
  • More generally:
    What programming paradigm?

The scenario is real!

35 years of CPU trends
CPU trends over 35 years
Source: Chuck Moore

The new opponent: Amdahl's Law

Amdahl's Law
Parallelisation is key!

Our old tools don't cut it

  • Threads (programmed directly)
    • high memory overhead
    • starting/stopping is expensive
    • inter-thread communication entirely left to the user
  • Locks/Mutexes/Semaphores/`synchronized`/`volatile`
    • too little sync: race conditions, wrong results
    • too much sync: deadlocks, poor performance
    • very hard to use correctly
  • We need something better!

The Core Problem

Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State Shared Mutable State

Akka

"A toolkit and runtime for building
highly concurrent, distributed, and fault-tolerant
event-driven applications on the JVM"

Source: http://akka.io

Akka's Promise

"Build powerful concurrent & distributed
applications more easily"

Source: http://akka.io

Core abstraction: Actor

  • a lightweight isolated "process"
  • contains state and "behavior"
  • communicates only via async & immutable messages
    (share nothing)
  • has a mailbox (message queue)
  • is supervised by its parent (for managing failure)
  • is location transparent (distributable)

An Actor in Scala


class CountingActor extends Actor {
  var counter = 0
  def receive = {
    case "ping"  ⇒ println("received ping")
    case "count" ⇒ counter += 1
    case "get"   ⇒ sender ! counter
  }
}
          

Actors vs. Objects

  • An actor is an object, but
  • you can't peek inside it
  • you don't call methods (but send messages)
  • you don't get return values (but receive messages)
  • is internally thread-safe

Actor Benefits

  • "An island of sanity in a sea of concurrency"
    • processes one message at a time
    • runs purely sequential easy-to-understand logic
  • very lightweight (~400 bytes)
  • can be constructed and torn down very quickly
  • leaves scheduling complexities to runtime
  • promotes high-granularity modulization

A different programming paradigm

Akka: more than just actors

  • Scala- and Java APIs across the board
  • akka-actor: actors
  • akka-cluster: fault-tolerant, decentralized
    peer-to-peer cluster membership service
  • akka-io: low-level network IO (TCP and UDP)
  • akka-persistence: event-sourcing
  • akka-http: HTTP/REST (soon, today: http://spray.io)
spray
http://spray.io

What is spray?

  • embeddable HTTP stack for your
    Akka (Scala) applications
  • focus: HTTP integration layers
    rather than web applications
  • server- and client-side

But, why?

Isn't HTTP on the JVM a "solved" problem?

Can't we just use Netty?
(or Servlets, or Restlet, or Undertow, ...)

Yes, we can

(it's being done all the time)


But: Do we want to?

Not really!

  • servlet containers?
  • XML configuration?
  • mutable data models / APIs?
  • Java Collections?
  • adapter layers?
  • limited type-safety?

What we want

  • `case class`-based model
  • actor-based APIs (message protocols)
  • functions as values
  • Scala/Akka Futures
  • Scala collections
  • type classes
  • type safety

What we also want

  • unified thread-pool mgmt. (Akka dispatchers)
  • unified configuration (Typesafe config)
  • unified logging (Akka event bus)
  • unified debugging / optimization
    (e.g. with Typesafe console)

We want to build on Akka!

  • our whole application that is,
    not just a few bits!
  • same principles, concepts and coding
    style in all layers of the stack:
    much easier problem analysis & tuning

spray builds on Akka

  • entirely built in Scala, no wrapping of Java libraries
  • fully async and non-blocking
  • only one type of active components in all layers: actors
  • core API style: message protocol
  • actor-friendly (e.g. "tell don't ask")
  • fast, lightweight, modular, testable

spray components

components

plus: spray-servlet, spray-testkit, ...

HTTP model

  • `case class`-based data model
  • high-level abstractions for most things HTTP
  • fully immutable, little logic
  • predefined instances for common media types, status codes, encodings, charsets, cache-control directives, etc.
  • open for extension
    (e.g. registration of custom media types)

HTTP model: show me code


case class HttpRequest(
  method: HttpMethod = HttpMethods.GET,
  uri: Uri = Uri./,
  headers: List[HttpHeader] = Nil,
  entity: HttpEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
					

HTTP model: show me code


case class HttpResponse(
  status: StatusCode = StatusCodes.OK,
  entity: HttpEntity = HttpEntity.Empty,
  headers: List[HttpHeader] = Nil,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
					

HTTP model: show me code


case class Uri(             // proper RFC 3986
  scheme: String,           // compliant,
  authority: Authority,     // immutable
  path: Path,               // URI model
  query: Query,             // with a fast,
  fragment: Option[String]) // custom parser
					

HTTP model: show me code


case class `Accept-Charset`(charsetRanges: Seq[HttpCharsetRange])
  extends HttpHeader

case class `Accept-Encoding`(encodings: Seq[HttpEncodingRange])
  extends HttpHeader

case class `Set-Cookie`(cookie: HttpCookie)
  extends HttpHeader

case class RawHeader(name: String, value: String)
  extends HttpHeader
					

Low-level HTTP layer

  • directly sits on top of the Akka IO
  • performs TCP HTTP "translation"
  • cleanly separated layer of actors
    provided as an Akka Extension
  • implements "essentials",
    no higher-level features (like file serving)

The IO stack

stack

Towards Akka IO (briefly)

  • network communication is
  • packet-based
  • no continuous flow of bytes
  • rather: chunked into messages

Towards Akka IO (2)

  • old-school Java IO (before NIO): stream-based
  • input: read a stream,
    block if no data
  • output: write to stream,
    block if sending is not currently possible
  •   paradigm mismatch: stream-based vs. message based

Towards Akka IO (3)

  • Java NIO ("new" IO):
  • extended API with support for
    async, non-blocking IO ops
  • but:
    • hard to use
    • still not message-based

Akka IO

  • bridges the gap between Java NIO and Akka actors
  • msg-based API surfaces the nature of the network
  • events come in, e.g. «connection established»,
    «bytes received», »connection closed», «error occured»
  • commands drive from our side, e.g.
    «attempt connecting»«send bytes»,«close connection»

spray-can

  • provides message-based APIs on multiple levels
    (server-side: connection-level,
    client-side: connection-, host- and request-level)
  • maximum throughput with acceptable latency
  • massive numbers of concurrent connections
  • HTTP pipelining
  • chunked messages (streaming)
  • SSL/TLS encryption

spray-can: show me code


class PingPongService extends Actor {
  def receive = {
    // when a new connection comes in we register
    // ourselves as the connection handler
    case _: Http.Connected ⇒ sender ! Http.Register(self)

    // can you guess what this does?
    case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒
      sender ! HttpResponse(entity = "PONG")
  }
}
					

spray-routing

  • internal DSL for the direct interface layer to the application
  • type-safe, yet flexible (thanks to shapeless)
  • much more than just routing: behavior definition
  • small and simple building blocks: directives
  • highly composable

API Layer: How it fits in

application API layer API layer API layer API layer API layer API layer

API Layer Responsibilities

  • request routing based on method, path, query, entity
  • (Un)marshalling to / from domain objects
  • encoding / decoding (compression)
  • authentication / authorization
  • caching and serving static content
  • RESTful error handling

API Layer: show me code


class MyServiceActor extends HttpServiceActor {
  def receive = runRoute {
    path("order" / HexIntNumber) { id =>
      get {
        complete {
          "Received GET request for order " + id
        }
      } ~
      put {
        complete {
          "Received PUT request for order " + id
        }
      }
    }
  }
}
				

Predefined Directives

alwaysCache, anyParam, anyParams, authenticate, authorize, autoChunk, cache, cachingProhibited, cancelAllRejections, cancelRejection, clientIP, complete, compressResponse, compressResponseIfRequested, cookie, decodeRequest, decompressRequest, delete, deleteCookie, detach, dynamic, dynamicIf, encodeResponse, entity, extract, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValuePF, hextract, host, hostName, hprovide, jsonpWithParameter, listDirectoryContents, logRequest, logRequestResponse, logResponse, mapHttpResponse, mapHttpResponseEntity, mapHttpResponseHeaders, mapHttpResponsePart, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapRouteResponse, mapRouteResponsePF, method, noop, onComplete, onFailure, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValuePF, options, overrideMethodWithParameter, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathPrefix, pathPrefixTest, pathSuffix, pathSuffixTest, post, produce, provide, put, rawPath, rawPathPrefix, rawPathPrefixTest, redirect, reject, rejectEmptyResponse, requestEncodedWith, requestEntityEmpty, requestEntityPresent, respondWithHeader, respondWithHeaders, respondWithLastModifiedHeader, respondWithMediaType, respondWithSingletonHeader, respondWithSingletonHeaders, respondWithStatus, responseEncodingAccepted, rewriteUnmatchedPath, routeRouteResponse, scheme, schemeName, setCookie, unmatchedPath, validate

Real-World Example


lazy val route = {
  encodeResponse(Gzip) {
    path("") {
      get {
        redirect("/doc")
      }
    } ~
    pathPrefix("api") {
      jsonpWithParameter("callback") {
        path("top-articles") {
          get {
            parameter("max".as[Int]) { max =>
              validate(max >= 0, "query parameter 'max' must be >= 0") {
                complete {
                  (topArticlesService ? max).mapTo[Seq[Article]]
                }
              }
            }
          }
        } ~
        tokenAuthenticate { user =>
          path("ranking") {
            get {
              countAndTime(user, "ranking") {
                parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
                           "data" ? 0).as(RankingDescriptor) { descr =>
                  complete {
                    (rankingService ? Ranking(descr)).mapTo[RankingResult]
                  }
                }
              }
            }
          } ~
          path("accounts") {
            post {
              authorize(user.isAdmin) {
                content(as[AccountDetails]) { details =>
                  complete {
                    (accountService ? NewAccount(details)).mapTo[OpResult]
                  }
                }
              }
            }
          } ~
          path("account" / IntNumber) { accountId =>
            get { ... } ~
            put { ... } ~
            delete { ... }
          }
        }
      } ~
      pathPrefix("v1") {
        proxyToDjango
      }
    } ~
    pathPrefix("doc") {
      respondWithHeader(`Cache-Control`(`max-age`(3600))) {
        transformResponse(_.withContentTransformed(markdown2Html)) {
          getFromResourceDirectory("doc/root",
                                   pathRewriter = appendFileExt)
        }
      }
    } ~
  } ~
  cacheIfEnabled {
    encodeResponse(Gzip) {
      getFromResourceDirectory("public")
    }
  }
}
				

Best Practices

  • Keep route structure clean and readable,

    pull out all logic into custom directives
  • Don’t let API layer leak into application
  • Use (Un)marshalling infrastructure
  • Think about error handling/reporting
    right from the start
  • Use sbt-revolver for fast dev turn-around

There is more...

Current State

  • Latest release: spray 1.[0123].1
  • Currently: spray becomes akka-http
  • Play will gradually move onto akka-http
  • Then: Improvements, features, rounding off
    • websockets
    • HTTP 2.0
    • ...

akka-http

  • Only two modules (down from 10)
    • akka-http-core
    • akka-http
  • No more spray-servlet (at least for now)
  • Java APIs
  • Main improvement:
    Everything now based on Reactive Streams

Towards Reactive Streams

  • What is a stream?
  • Ephemeral flow of data
  • Focused on describing transformation
  • Possibly unbounded in size

Common Uses of Streams

  • Bulk data transfer
  • Real-time data sources
  • Batch processing of large data sets
  • Monitoring and analytics

Example: Scaling Logic

scaling scaling scaling scaling

Async Boundary

scaling scaling scaling scaling scaling scaling

Async Boundaries Everywhere

  • Between actors
  • Between threads
  • Between CPUs
  • Between network nodes
  • Between applications

Async Boundary

  • Data elements flow downstream
  • Demand flows upstream
  • Data items flow only when there is demand
  • Recipient is in control of incoming data rate
  • Data in flight is bounded by signaled demand

Dynamic Push/Pull

  • “Push” behavior when consumer is faster
  • “Pull” behavior when producer is faster
  • Switches automatically between these
  • Batching demand allows batching data

The Reactive Streams Project

  • Participants: Engineers from
  • Netflix
  • Twitter
  • Red Hat
  • Pivotal
  • Typesafe
  • spray.io

The Motivation

  • All participants had the same basic problem
  • All are building tools for their community
  • A common solution benefits everybody
  • Interoperability to make best use of efforts
  • You can say:
    
        twitterStream
          .produceTo(rxjavaObservable)
          .produceTo(reactorStream)
          .produceTo(akkaStream)
              

Recipe for Success

  • Minimal interfaces
  • Rigorous specification of semantics
  • Full TCK for verification of implementation
  • Complete freedom for many idiomatic APIs

Show me Code


trait Publisher[T] {
  def subscribe(sub: Subscriber[T]): Unit
}
trait Subscription {
  def requestMore(n: Int): Unit
  def cancel(): Unit
}
trait Subscriber[T] {
  def onSubscribe(s: Subscription): Unit
  def onNext(elem: T): Unit
  def onError(thr: Throwable): Unit
  def onComplete(): Unit
}

Getting started

THANK YOU!

Q & A