akka

Reactive Streams   &   Akka HTTP

2015-02-12 @ Berlin Scala User Group

Mathias Doenitz   / /

This presentation: http://spray.io/berlin/

PART 1:
Reactive Streams

Our Hardware base is changing!

35 years of CPU trends
CPU trends over 35 years
Source: Chuck Moore

The new opponent: Amdahl's Law

Amdahl's Law
Parallelisation is key!

Our old tools don't cut it

  • Threads (programmed directly)
    • high memory overhead
    • starting/stopping is expensive
    • inter-thread communication entirely left to the user
  • Locks/Mutexes/Semaphores/`synchronized`/`volatile`
    • too little sync: race conditions, wrong results
    • too much sync: deadlocks, poor performance
    • very hard to use correctly
  • Too low-level!

Selected concurrency abstractions

Concurrency Abstractions Overview Concurrency Abstractions Overview

Reactive Streams Basics

  • What is a stream?
  • Ephemeral flow of data items, commands,
    other elements of any kind
  • Focused on describing transformation
    and combination/composition
  • Possibly unbounded in size
  • NOT a data structure!

Common Uses of Streams

  • Bulk data transfer
  • Real-time data sources
  • Batch processing of large data sets
  • Monitoring and analytics
  • Structuring and Scaling Business Logic

Example: Scaling Logic

scaling scaling scaling scaling scaling

Taking things further

scaling scaling scaling scaling
Fine-grained Parallelisation is key!

Common problem: Backpressure

Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed

Critical Point: Async Boundary

Async Boundary Async Boundary Async Boundary Async Boundary Async Boundary Async Boundary Async Boundary Async Boundary Async Boundary

Async Boundary

  • Data elements flow downstream
  • Demand flows upstream
  • Data items flow only when there is demand
  • Recipient is in control of incoming data rate
  • Data in flight is bounded by signaled demand

Dynamic Push/Pull

  • “Push” behavior when consumer is faster
  • “Pull” behavior when producer is faster
  • Switches automatically between these
  • Batching demand allows batching data

Async Boundaries are Everywhere

  • Between actors
  • Between threads
  • Between CPUs
  • Between network nodes
  • Between applications

Pipeline Processing Done Right

Pipeline Done Right Pipeline Done Right Pipeline Done Right Pipeline Done Right Pipeline Done Right Pipeline Done Right Pipeline Done Right

Continuous Pipelines across Machines

Multi-machine Pipeline Multi-machine Pipeline

The Reactive Streams Project

  • Participants: Engineers from
  • Netflix
  • Twitter
  • Red Hat
  • Pivotal
  • Typesafe

Goals

  • An common, minimal standard interface
    for handing data across async boundaries
  • Complete freedom for many idiomatic APIs
  • Interoperability to make best use of efforts
  • You should be able to say:
    
      twitterStream
        .produceTo(rxjavaObservable)
        .produceTo(reactorStream)
        .produceTo(akkaStream)
            

Reactive Stream API


public interface Publisher<T> {
  public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
  public void request(long n);
  public void cancel();
}
public interface Subscriber<T> {
  public void onSubscribe(Subscription s);
  public void onNext(T t);
  public void onError(Throwable t);
  public void onComplete();
}
        

Not for user consumption!
(Use RS Impl Library instead)

akka-stream: basic concepts

  • `Source[T]`: the open end of a pipeline producing `T`s
  • `Sink[T]`: an "end-piece" for taking in `T`s
  • `Flow[A, B]`: an unconnected piece of pipeline
  • Generally, all abstractions are re-useable
  • "Materialization":
    The process of starting an actual stream instance

akka-stream: simple stream example


Source(stockTickerPublisher)                 // Source[Tick]
  .filter(_.symbol == "AAPL")                // Source[Tick]       
  .buffer(100000, OverflowStrategy.DropHead) // Source[Tick]
  .splitWhen(x => isNewDay(x.timeStamp))     // Source[Source[Tick]]
  .headAndTail                               // Source[(Tick, Source[Tick])]
  .map { case (head, tail) =>
     head -> tail.groupedWithin(1000, 1.second)
  }                                          // Source[(Tick, Source[Seq[[Tick]]])]
  .via(someFlow)                             // Source[RichTick]
  .map(toCandleStickChartColumn)             // Source[CandleStickChartColumn]
  .to(candleStickChartSink)                  // RunnableFlow
  .run()                                     // MaterializedMap
        

akka-stream: flow graph example

Flowgraph

FlowGraph { implicit b ⇒
  val bcast = Broadcast[T]
  val merge = Merge[T]

  source ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> sink
  bcast ~> f4 ~> merge
}.run()
        

Reactive Streams: Status

  • Reactive Streams API Spec 1.0 almost complete
  • TCK (Technology Compatibility Kit) available
  • Several implementations in various states of progress
  • For Scala: akka-stream currently the most idiomatic
  • APIs becoming more stable,
    a lot of new ground had to be broken

More Q & A

PART 2:
Akka HTTP

Why do we all need a
(proper) HTTP stack?

We live in a multi-everything world

  • Multi-Threaded
  • Multi-Cored
  • Multi-Machined
  • Multi-Data-Centered

All our applications are distributed

  • Across threads and cores
  • Across machines and data centers
  • Across organisations
  • Across the world!

Akka Toolkit Promise

"Build powerful concurrent concurrent & distributed distributed
applications more easily"

Source: http://akka.io

Distribution implies integration

  • Within one machine
    e.g. akka-actor
  • Between your own (sub-)systems
    e.g. akka-remoting & akka-cluster
  • With other (external) systems
    e.g. akka-http
  • HTTP is the most successful
    external integration protocol to date!

Akka-Http Design Goals

  • Max throughput with acceptable latency
  • Properly scale to "unlimited" number
    of connections, across all cores
  • Server- and client-side support
  • Open, composable APIs ("not-a-framework style")
  • Focus: HTTP integration layers

Akka-Http Implementation

  • Fully async & non-blocking
  • Actor-friendly, but centered around non-actor APIs
  • Lighweight, modular, testable
  • Fully decoupleable from underlying TCP interface
  • Based on spray.io codebase and experience

spray heritage

  • Immutable, case-class-based HTTP model
  • Efficient HTTP parsing and rendering logic
  • Powerful DSL for server-side
    REST API definition
  • Route structure testkit

spray weaknesses

  • Handling of chunked requests is clunky, incomplete
  • Dealing with large message entities can be difficult
  • High-level routing DSL sometimes unintuitive,
    some mistakes not caught at compile-time
  • Deep implicit structures, sometimes hard to debug
  • Missing features (e.g. websocket support)

Proxying Large Responses

Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed

akka-http is spray 2.0

  • Across-the-board polishing,
    addressing of weaknesses
  • Java APIs
  • Simplified module structure
  • Rewrite of core components
    to be based on Reactive Streams

Streams in akka-http

  • Requests on one HTTP connection
  • Responses on one HTTP connection
  • Chunks of a chunked message
  • Bytes of a message entity

HTTP Stream Interfaces

Reactive HTTP Streams

The Application Stack

stack

akka-io

  • Bridges the gap between Java NIO
    and Akka actors / streams
  • Provides both msg-based
    as well as stream-based API
  • Supports TCP, UDP and SSL/TLS

akka-http-core

  • Directly sits on top of Akka IO
  • Performs TCP HTTP "translation"
  • Cleanly separated layer of stream trans-
    formations provided as an Akka Extension
  • Implements HTTP "essentials",
    no higher-level features (like file serving)

akka-http

  • Provides higher-level server- and client-side APIs
  • "Unmarshalling" custom types from HttpEntities
  • "Marshalling" custom types to HttpEntities
  • (De)compression (GZip / Deflate)
  • Routing DSLs

HTTP Server API (1/3)


def bind(endpoint: InetSocketAddress, ...): Http.ServerBinding
  
object Http {
  trait ServerBinding {
    def connections: Source[IncomingConnection]

    def localAddress(mm: MaterializedMap): Future[InetSocketAddress]

    def unbind(mm: MaterializedMap): Future[Unit]

    /* ... plus `startHandlingWithXXX(...)` sugar ... */
  }
}
        

HTTP Server API (2/3)


object Http {
  trait IncomingConnection {
    def localAddress: InetSocketAddress
    def remoteAddress: InetSocketAddress

    def handleWith
      (handler: Flow[HttpRequest, HttpResponse]): MaterializedMap

    def handleWithSyncHandler
      (handler: HttpRequest ⇒ HttpResponse): MaterializedMap

    def handleWithAsyncHandler
      (handler: HttpRequest ⇒ Future[HttpResponse]): MaterializedMap
  }
}
        

HTTP Server API (3/3)


/**
 * Transforms a given HTTP-level server Flow
 * into a lower-level TCP transport flow.
 */
def serverFlowToTransport(
  serverFlow: Flow[HttpRequest, HttpResponse],
  ...): Flow[ByteString, ByteString]
        

Simple HTTP Server


val binding = Http().bind("localhost", 8080)

binding startHandlingWithSyncHandler {

  case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒
    HttpResponse(entity = "PONG!")

  case _ ⇒ // catch all
    HttpResponse(404, entity = "Unknown resource!")
}
        

Basic HTTP Client API (1/2)


def outgoingConnection(endpoint: InetSocketAddress, ...)
  : Http.OutgoingConnection

object Http {
  trait OutgoingConnection {
    def remoteAddress: InetSocketAddress
    def localAddress(mm: MaterializedMap): Future[InetSocketAddress]
    def flow: Flow[HttpRequest, HttpResponse]
  }
}
        

Basic HTTP Client API (2/2)


/**
 * Transforms the given low-level TCP client transport
 * Flow into a higher-level HTTP client flow.
 */
  def transportToConnectionClientFlow(
    transport: Flow[ByteString, ByteString],
    ...): Flow[HttpRequest, HttpResponse]
        

HTTP model

  • `case class`-based data model
  • High-level abstractions for most things HTTP
  • Fully immutable, little logic
  • Predefines common media types, status codes, encodings, charsets, cache-control directives, etc.
  • Open for extension (e.g. custom media types)

HTTP Request


case class HttpRequest(
  method: HttpMethod = HttpMethods.GET,
  uri: Uri = Uri./,
  headers: immutable.Seq[HttpHeader] = Nil,
  entity: RequestEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
        

HTTP Response


case class HttpResponse(
  status: StatusCode = StatusCodes.OK,
  headers: immutable.Seq[HttpHeader] = Nil,
  entity: ResponseEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
        

HTTP model: Uri


case class Uri(             // proper RFC 3986
  scheme: String,           // compliant,
  authority: Authority,     // immutable
  path: Path,               // URI model
  query: Query,             // with a fast,
  fragment: Option[String]) // custom parser
        

HTTP Entity (1/2)


sealed trait HttpEntity

sealed trait ResponseEntity extends HttpEntity

sealed trait RequestEntity extends ResponseEntity

// can be used for any message (request or response)
type MessageEntity = RequestEntity

sealed trait BodyPartEntity extends HttpEntity

// can be used for messages as well as bodyparts
sealed trait UniversalEntity extends MessageEntity
                             with BodyPartEntity
        

HTTP Entity (2/2)


object HttpEntity {
  case class Strict(contentType: ContentType,
    data: ByteString) extends UniversalEntity

  case class Default(contentType: ContentType, contentLength: Long,
    data: Source[ByteString]) extends UniversalEntity

  case class Chunked(contentType: ContentType,
    chunks: Source[ChunkStreamPart]) extends MessageEntity

  case class CloseDelimited(contentType: ContentType,
    data: Source[ByteString]) extends ResponseEntity
  
  case class IndefiniteLength(contentType: ContentType,
    data: Source[ByteString]) extends BodyPartEntity                         
}
        

Exemplary HTTP Headers


case class `Accept-Charset`(charsetRanges: immutable.Seq[HttpCharsetRange])
  extends HttpHeader

case class `Cache-Control`(directives: immutable.Seq[CacheDirective])
  extends HttpHeader

case class `Set-Cookie`(cookie: HttpCookie)
  extends HttpHeader

case class RawHeader(name: String, value: String)
  extends HttpHeader
        

Benefits over spray design

  • Properly typed model for requests,
    responses and nested entities
  • Entities can have arbitrary size
  • Chunks now on an inner level,
    below the messages
  • Can now receive message headers
    before the entity

Server-Side Routing DSL

  • Internal DSL for the interface
    layer to the application
  • Type-safe, yet flexible
  • Much more than just routing:
    behavior definition
  • Small and simple building blocks: directives
  • Highly composable

Server-Side API Layer: Overview

application API layer API layer API layer API layer API layer API layer

API Layer Responsibilities

  • Request routing based on method, path, query, entity
  • (Un)marshalling to / from domain objects
  • Encoding / decoding (compression)
  • Authentication / authorization
  • Caching and serving static content
  • RESTful error handling

Routing DSL: show me code


import Directives._

val binding = Http().bind("localhost", 8080)

binding startHandlingWith {
  path("order" / HexIntNumber) { id =>
    get {
      complete(s"Received GET for order $id")
    } ~
    put {
      complete(s"Received PUT for order $id")
    }
  }
}
        

Predefined Directives (1.0-M1)

authorize, cancelRejection, cancelRejections, complete, completeOrRecoverWith, completeWith, compressResponse, compressResponseIfRequested, conditional, cookie, decodeRequest, decompressRequest, delete, deleteCookie, deleteCookie, encodeResponse, entity, extract, extractExecutionContext, extractFlowMaterializer, extractHost, extractLog, extractRequest, extractScheme, extractSettings, extractUnmatchedPath, extractUri, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValueByType, headerValuePF, host, listDirectoryContents, logRequest, logRequestResult, logResult, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapResponse, mapResponseEntity, mapResponseHeaders, mapRouteResult, mapRouteResultFuture, mapRouteResultPF, mapRouteResultWith, mapRouteResultWithPF, mapSettings, mapUnmatchedPath, method, onComplete, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValueByType, optionalHeaderValuePF, options, overrideMethodWithParameter, overrideStatusCode, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathEnd, pathEndOrSingleSlash, pathPrefix, pathPrefixTest, pathSingleSlash, pathSuffix, pathSuffixTest, post, provide, put, rawPathPrefix, rawPathPrefixTest, recoverRejections, recoverRejectionsWith, redirect, reject, reject, requestEncodedWith, respondWithDefaultHeader, respondWithDefaultHeaders, respondWithHeader, respondWithHeaders, responseEncodingAccepted, scheme, setCookie, textract, tprovide, withExecutionContext, withFlowMaterializer, withLog, withRangeSupport, withSettings

Real-World Example (spray)


lazy val route =
  encodeResponse(Gzip) {
    pathEndOrSingleSlash {
      get {
        redirect("/doc")
      }
    } ~
    pathPrefix("api") {
      path("top-articles") {
        get {
          parameter("max".as[Int]) { max =>
            validate(max >= 0, "query parameter 'max' must be >= 0") {
              complete {
                (topArticlesService ? max).mapTo[Seq[Article]]
              }
            }
          }
        }
      } ~
      tokenAuthenticate { user =>
        path("ranking") {
          get {
            countAndTime(user, "ranking") {
              parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
                         "data" ? 0).as(RankingDescriptor) { descr =>
                complete {
                  (rankingService ? Ranking(descr)).mapTo[RankingResult]
                }
              }
            }
          }
        } ~
        path("accounts") {
          post {
            authorize(user.isAdmin) {
              entity(as[AccountDetails]) { details =>
                complete {
                  (accountService ? NewAccount(details)).mapTo[OpResult]
                }
              }
            }
          }
        } ~
        path("account" / IntNumber) { accountId =>
          get { ... } ~
          put { ... } ~
          delete { ... }
        }
      }
    } ~
    pathPrefix("v1") {
      proxyToDjango
    } ~
    pathPrefix("doc") {
      respondWithHeader(`Cache-Control`(`max-age`(3600))) {
        transformResponse(_.withContentTransformed(markdown2Html)) {
          getFromResourceDirectory("doc/root",
                                   pathRewriter = appendFileExt)
        }
      }
    } ~
  } ~
  cacheIfEnabled {
    encodeResponse(Gzip) {
      getFromResourceDirectory("public")
    }
  }
      

Best Practices

  • Keep route structure clean and readable,

    pull out all logic into custom directives
  • Don’t let API layer leak into application
  • Use (Un)marshalling infrastructure
  • Think about error handling/reporting
    right from the start
  • Use sbt-revolver for fast dev turn-around

There is more...

  • Testing routes
  • client-side APIs
  • JSON support
  • ...

akka-http Roadmap

  • Agree and specify Reactive Streams API
  • Finish initial release of new akka modules
    • akka-stream
    • akka-http-core (client- & server-side)
    • akka-http (server-side)
    • akka-http (client-side)
  • Add websockets support (client- & server-side)
  • Move Play onto akka-http (incrementally)

Resources

THANK YOU!

More Q & A