akka

REST on Akka: Connect to the World

Jfokus     2015-02-03@stockholm

Mathias Doenitz   / /

This presentation: http://spray.io/jfokus/

Why do we all need a
(proper) HTTP stack?

We live in a multi-everything world

  • Multi-Threaded
  • Multi-Cored
  • Multi-Machined
  • Multi-Data-Centered

All our applications are distributed

  • Across threads and cores
  • Across machines and data centers
  • Across organisations
  • Across the world!

Akka

"A toolkit and runtime for building
highly concurrent, distributed, and fault-tolerant
event-driven applications on the JVM"

Source: http://akka.io

Akka's Promise

"Build powerful concurrent & distributed
applications more easily"

Source: http://akka.io

Distribution implies integration

  • Within one machine
    e.g. akka-actor
  • Between your own (sub-)systems
    e.g. akka-remoting & akka-cluster
  • With other (external) systems
    e.g. akka-http
  • HTTP is the most successful
    external integration protocol to date!

Akka-Http Design Goals

  • Max throughput with acceptable latency
  • Properly scale to "unlimited" number
    of connections, across all cores
  • Server- and client-side support
  • Open, composable APIs ("not-a-framework style")
  • Focus: HTTP integration layers

Akka-Http Implementation

  • Fully async & non-blocking
  • Actor-friendly, but centered around non-actor APIs
  • Lighweight, modular, testable
  • Fully decoupleable from underlying TCP interface
  • Based on spray.io codebase and experience

spray heritage

  • Embeddable HTTP stack
    entirely built on Akka actors
  • Immutable, case-class-based HTTP model
  • Efficient HTTP parsing and rendering logic
  • Powerful DSL for server-side
    REST API definition
  • Fully integrated into Typesafe stack
    (threadpools, config, debugging, etc.)

spray weaknesses

  • Handling of chunked requests is clunky, incomplete
  • Dealing with large message entities can be difficult
  • High-level routing DSL sometimes unintuitive,
    some mistakes not caught at compile-time
  • Deep implicit structures, sometimes hard to debug
  • Missing features (e.g. websocket support)

Proxying Large Responses

Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed Backpressure needed

akka-http is spray 2.0

  • Across-the-board polishing,
    addressing of weaknesses
  • Java APIs
  • Simplified module structure
  • Core improvement: now fully
    based on Reactive Streams

Reactive Streams

  • New abstraction for async & non-blocking
    pipeline processing "done right"
  • Main advantage over prior works:
    automatic support for back-pressure
  • Standard API, impls provide DSLs for
    composable stream transformations
  • Many use cases
  • Joint effort of Netflix, Twitter, Red Hat,
    Pivotal and Typesafe

Streams in akka-http

  • Requests on one HTTP connection
  • Responses on one HTTP connection
  • Chunks of a chunked message
  • Bytes of a message entity

HTTP Stream Interfaces

Reactive HTTP Streams

The Application Stack

stack

akka-io

  • Bridges the gap between Java NIO
    and Akka actors / streams
  • Provides both msg-based
    as well as stream-based API
  • Supports TCP, UDP and SSL/TLS

akka-http-core

  • Directly sits on top of Akka IO
  • Performs TCP HTTP "translation"
  • Cleanly separated layer of stream trans-
    formations provided as an Akka Extension
  • Implements HTTP "essentials",
    no higher-level features (like file serving)

akka-http

  • Provides higher-level server- and client-side APIs
  • "Unmarshalling" custom types from HttpEntities
  • "Marshalling" custom types to HttpEntities
  • (De)compression (GZip / Deflate)
  • Routing DSLs

akka-stream: basic concepts

  • `Source[T]`: the open end of a pipeline producing `T`s
  • `Sink[T]`: an "end-piece" for taking in `T`s
  • `Flow[A, B]`: an unconnected piece of pipeline
  • Generally, all abstractions are re-useable
  • "Materialization":
    The process of starting an actual stream instance

HTTP Server API (1/3)


def bind(endpoint: InetSocketAddress, ...): Http.ServerBinding
  
object Http {
  trait ServerBinding {
    def connections: Source[IncomingConnection]

    def localAddress(mm: MaterializedMap): Future[InetSocketAddress]

    def unbind(mm: MaterializedMap): Future[Unit]

    /* ... plus `startHandlingWithXXX(...)` sugar ... */
  }
}
        

HTTP Server API (2/3)


object Http {
  trait IncomingConnection {
    def localAddress: InetSocketAddress
    def remoteAddress: InetSocketAddress

    def handleWith
      (handler: Flow[HttpRequest, HttpResponse]): MaterializedMap

    def handleWithSyncHandler
      (handler: HttpRequest ⇒ HttpResponse): MaterializedMap

    def handleWithAsyncHandler
      (handler: HttpRequest ⇒ Future[HttpResponse]): MaterializedMap
  }
}
        

HTTP Server API (3/3)


/**
 * Transforms a given HTTP-level server Flow
 * into a lower-level TCP transport flow.
 */
def serverFlowToTransport(
  serverFlow: Flow[HttpRequest, HttpResponse],
  ...): Flow[ByteString, ByteString]
        

Simple HTTP Server


val binding = Http().bind("localhost", 8080)

binding startHandlingWithSyncHandler {

  case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒
    HttpResponse(entity = "PONG!")

  case _ ⇒ // catch all
    HttpResponse(404, entity = "Unknown resource!")
}
        

Basic HTTP Client API (1/2)


def outgoingConnection(endpoint: InetSocketAddress, ...)
  : Http.OutgoingConnection

object Http {
  trait OutgoingConnection {
    def remoteAddress: InetSocketAddress
    def localAddress(mm: MaterializedMap): Future[InetSocketAddress]
    def flow: Flow[HttpRequest, HttpResponse]
  }
}
        

Basic HTTP Client API (2/2)


/**
 * Transforms the given low-level TCP client transport
 * Flow into a higher-level HTTP client flow.
 */
  def transportToConnectionClientFlow(
    transport: Flow[ByteString, ByteString],
    ...): Flow[HttpRequest, HttpResponse]
        

HTTP model

  • `case class`-based data model
  • High-level abstractions for most things HTTP
  • Fully immutable, little logic
  • Predefines common media types, status codes, encodings, charsets, cache-control directives, etc.
  • Open for extension (e.g. custom media types)

HTTP Request


case class HttpRequest(
  method: HttpMethod = HttpMethods.GET,
  uri: Uri = Uri./,
  headers: immutable.Seq[HttpHeader] = Nil,
  entity: RequestEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
        

HTTP Response


case class HttpResponse(
  status: StatusCode = StatusCodes.OK,
  headers: immutable.Seq[HttpHeader] = Nil,
  entity: ResponseEntity = HttpEntity.Empty,
  protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
        

HTTP model: Uri


case class Uri(             // proper RFC 3986
  scheme: String,           // compliant,
  authority: Authority,     // immutable
  path: Path,               // URI model
  query: Query,             // with a fast,
  fragment: Option[String]) // custom parser
        

HTTP Entity (1/2)


sealed trait HttpEntity

sealed trait ResponseEntity extends HttpEntity

sealed trait RequestEntity extends ResponseEntity

// can be used for any message (request or response)
type MessageEntity = RequestEntity

sealed trait BodyPartEntity extends HttpEntity

// can be used for messages as well as bodyparts
sealed trait UniversalEntity extends MessageEntity
                             with BodyPartEntity
        

HTTP Entity (2/2)


object HttpEntity {
  case class Strict(contentType: ContentType,
    data: ByteString) extends UniversalEntity

  case class Default(contentType: ContentType, contentLength: Long,
    data: Source[ByteString]) extends UniversalEntity

  case class Chunked(contentType: ContentType,
    chunks: Source[ChunkStreamPart]) extends MessageEntity

  case class CloseDelimited(contentType: ContentType,
    data: Source[ByteString]) extends ResponseEntity
  
  case class IndefiniteLength(contentType: ContentType,
    data: Source[ByteString]) extends BodyPartEntity                         
}
        

Exemplary HTTP Headers


case class `Accept-Charset`(charsetRanges: immutable.Seq[HttpCharsetRange])
  extends HttpHeader

case class `Cache-Control`(directives: immutable.Seq[CacheDirective])
  extends HttpHeader

case class `Set-Cookie`(cookie: HttpCookie)
  extends HttpHeader

case class RawHeader(name: String, value: String)
  extends HttpHeader
        

Server-Side Routing DSL

  • Internal DSL for the interface
    layer to the application
  • Type-safe, yet flexible
  • Much more than just routing:
    behavior definition
  • Small and simple building blocks: directives
  • Highly composable

Server-Side API Layer: Overview

application API layer API layer API layer API layer API layer API layer

API Layer Responsibilities

  • Request routing based on method, path, query, entity
  • (Un)marshalling to / from domain objects
  • Encoding / decoding (compression)
  • Authentication / authorization
  • Caching and serving static content
  • RESTful error handling

Routing DSL: show me code


import Directives._

val binding = Http().bind("localhost", 8080)

binding startHandlingWith {
  path("order" / HexIntNumber) { id =>
    get {
      complete(s"Received GET for order $id")
    } ~
    put {
      complete(s"Received PUT for order $id")
    }
  }
}
      

Predefined Directives (1.0-M1)

authorize, cancelRejection, cancelRejections, complete, completeOrRecoverWith, completeWith, compressResponse, compressResponseIfRequested, conditional, cookie, decodeRequest, decompressRequest, delete, deleteCookie, deleteCookie, encodeResponse, entity, extract, extractExecutionContext, extractFlowMaterializer, extractHost, extractLog, extractRequest, extractScheme, extractSettings, extractUnmatchedPath, extractUri, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValueByType, headerValuePF, host, listDirectoryContents, logRequest, logRequestResult, logResult, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapResponse, mapResponseEntity, mapResponseHeaders, mapRouteResult, mapRouteResultFuture, mapRouteResultPF, mapRouteResultWith, mapRouteResultWithPF, mapSettings, mapUnmatchedPath, method, onComplete, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValueByType, optionalHeaderValuePF, options, overrideMethodWithParameter, overrideStatusCode, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathEnd, pathEndOrSingleSlash, pathPrefix, pathPrefixTest, pathSingleSlash, pathSuffix, pathSuffixTest, post, provide, put, rawPathPrefix, rawPathPrefixTest, recoverRejections, recoverRejectionsWith, redirect, reject, reject, requestEncodedWith, respondWithDefaultHeader, respondWithDefaultHeaders, respondWithHeader, respondWithHeaders, responseEncodingAccepted, scheme, setCookie, textract, tprovide, withExecutionContext, withFlowMaterializer, withLog, withRangeSupport, withSettings

Real-World Example (spray)


lazy val route =
  encodeResponse(Gzip) {
    pathEndOrSingleSlash {
      get {
        redirect("/doc")
      }
    } ~
    pathPrefix("api") {
      path("top-articles") {
        get {
          parameter("max".as[Int]) { max =>
            validate(max >= 0, "query parameter 'max' must be >= 0") {
              complete {
                (topArticlesService ? max).mapTo[Seq[Article]]
              }
            }
          }
        }
      } ~
      tokenAuthenticate { user =>
        path("ranking") {
          get {
            countAndTime(user, "ranking") {
              parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
                         "data" ? 0).as(RankingDescriptor) { descr =>
                complete {
                  (rankingService ? Ranking(descr)).mapTo[RankingResult]
                }
              }
            }
          }
        } ~
        path("accounts") {
          post {
            authorize(user.isAdmin) {
              entity(as[AccountDetails]) { details =>
                complete {
                  (accountService ? NewAccount(details)).mapTo[OpResult]
                }
              }
            }
          }
        } ~
        path("account" / IntNumber) { accountId =>
          get { ... } ~
          put { ... } ~
          delete { ... }
        }
      }
    } ~
    pathPrefix("v1") {
      proxyToDjango
    } ~
    pathPrefix("doc") {
      respondWithHeader(`Cache-Control`(`max-age`(3600))) {
        transformResponse(_.withContentTransformed(markdown2Html)) {
          getFromResourceDirectory("doc/root",
                                   pathRewriter = appendFileExt)
        }
      }
    } ~
  } ~
  cacheIfEnabled {
    encodeResponse(Gzip) {
      getFromResourceDirectory("public")
    }
  }
      

Best Practices

  • Keep route structure clean and readable,

    pull out all logic into custom directives
  • Don’t let API layer leak into application
  • Use (Un)marshalling infrastructure
  • Think about error handling/reporting
    right from the start
  • Use sbt-revolver for fast dev turn-around

There is more...

  • Testing routes
  • client-side APIs
  • JSON support
  • ...

akka-http Roadmap

  • Agree and specify Reactive Streams API
  • Finish initial release of new akka modules
    • akka-stream
    • akka-http-core (client- & server-side)
    • akka-http (server-side)
    • akka-http (client-side)
  • Add websockets support (client- & server-side)
  • Move Play onto akka-http (incrementally)

Resources

THANK YOU!

Q & A