MSUG 2014-10-09@munich
Mathias Doenitz
/
/
This presentation: http://spray.io/msug/
twitterStream
.produceTo(rxjavaObservable)
.produceTo(reactorStream)
.produceTo(akkaStream)
Isn't this "feature creep"?
Too high-level?
Where is the connection to its "actor heart"?
"A toolkit and runtime for building
highly concurrent, distributed, and fault-tolerant
event-driven applications on the JVM"
"Build powerful
concurrent
concurrent
&
distributed
distributed
applications more easily"
case class Bind(
connectionSink: Sink[IncomingConnection],
localAddress: InetSocketAddress, ...)
case class IncomingConnection(
remoteAddress: InetSocketAddress,
requestSource: Source[HttpRequest],
responseSink: Sink[HttpResponse])
val connectionSink =
ForeachSink[IncomingConnection] { conn =>
conn.requestSource.map {
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒
HttpResponse(entity = "PONG!")
case _ ⇒ HttpResponse(404, entity = "Unknown resource!")
}.connect(conn.responseSink).run()
}
IO(Http) ! Http.Bind(connectionSink, "localhost", 8080)
case class Connect(
remoteAddress: InetSocketAddress, ...)
case class OutgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
processor: HttpClientFlow[Any])
trait HttpClientFlow[T]
extends Flow[(HttpRequest, T), (HttpResponse, T)]
case class HttpRequest(
method: HttpMethod = HttpMethods.GET,
uri: Uri = Uri./,
headers: immutable.Seq[HttpHeader] = Nil,
entity: RequestEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
case class HttpResponse(
status: StatusCode = StatusCodes.OK,
headers: immutable.Seq[HttpHeader] = Nil,
entity: ResponseEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
case class Uri( // proper RFC 3986
scheme: String, // compliant,
authority: Authority, // immutable
path: Path, // URI model
query: Query, // with a fast,
fragment: Option[String]) // custom parser
sealed trait HttpEntity
sealed trait ResponseEntity extends HttpEntity
sealed trait RequestEntity extends ResponseEntity
// can be used for any message (request or response)
type MessageEntity = RequestEntity
sealed trait BodyPartEntity extends HttpEntity
// can be used for messages as well as bodyparts
sealed trait UniversalEntity extends HttpEntity
object HttpEntity {
case class Strict(contentType: ContentType,
data: ByteString) extends UniversalEntity
case class Default(contentType: ContentType, contentLength: Long,
data: Source[ByteString]) extends UniversalEntity
case class Chunked(contentType: ContentType,
chunks: Source[ChunkStreamPart]) extends MessageEntity
case class CloseDelimited(contentType: ContentType,
data: Source[ByteString]) extends ResponseEntity
case class IndefiniteLength(contentType: ContentType,
data: Source[ByteString]) extends BodyPartEntity
}
case class `Accept-Charset`(charsetRanges: immutable.Seq[HttpCharsetRange])
extends HttpHeader
case class `Cache-Control`(directives: immutable.Seq[CacheDirective])
extends HttpHeader
case class `Set-Cookie`(cookie: HttpCookie)
extends HttpHeader
case class RawHeader(name: String, value: String)
extends HttpHeader
import ScalaRoutingDSL._
val connectionSink =
ForeachSink {
runRoute {
path("order" / HexIntNumber) { id =>
get {
complete(s"Received GET for order $id")
} ~
put {
complete(s"Received PUT for order $id")
}
}
}
}
IO(Http) ! Http.Bind(connectionSink, "localhost", 8080)
alwaysCache, anyParam, anyParams, authenticate, authorize, autoChunk, cache, cachingProhibited, cancelAllRejections, cancelRejection, clientIP, complete, compressResponse, compressResponseIfRequested, cookie, decodeRequest, decompressRequest, delete, deleteCookie, detach, dynamic, dynamicIf, encodeResponse, entity, extract, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValuePF, hextract, host, hostName, hprovide, jsonpWithParameter, listDirectoryContents, logRequest, logRequestResponse, logResponse, mapHttpResponse, mapHttpResponseEntity, mapHttpResponseHeaders, mapHttpResponsePart, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapRouteResponse, mapRouteResponsePF, method, noop, onComplete, onFailure, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValuePF, options, overrideMethodWithParameter, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathPrefix, pathPrefixTest, pathSuffix, pathSuffixTest, post, produce, provide, put, rawPath, rawPathPrefix, rawPathPrefixTest, redirect, reject, rejectEmptyResponse, requestEncodedWith, requestEntityEmpty, requestEntityPresent, respondWithHeader, respondWithHeaders, respondWithLastModifiedHeader, respondWithMediaType, respondWithSingletonHeader, respondWithSingletonHeaders, respondWithStatus, responseEncodingAccepted, rewriteUnmatchedPath, routeRouteResponse, scheme, schemeName, setCookie, unmatchedPath, validate
lazy val route =
encodeResponse(Gzip) {
pathEndOrSingleSlash {
get {
redirect("/doc")
}
} ~
pathPrefix("api") {
path("top-articles") {
get {
parameter("max".as[Int]) { max =>
validate(max >= 0, "query parameter 'max' must be >= 0") {
complete {
(topArticlesService ? max).mapTo[Seq[Article]]
}
}
}
}
} ~
tokenAuthenticate { user =>
path("ranking") {
get {
countAndTime(user, "ranking") {
parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
"data" ? 0).as(RankingDescriptor) { descr =>
complete {
(rankingService ? Ranking(descr)).mapTo[RankingResult]
}
}
}
}
} ~
path("accounts") {
post {
authorize(user.isAdmin) {
entity(as[AccountDetails]) { details =>
complete {
(accountService ? NewAccount(details)).mapTo[OpResult]
}
}
}
}
} ~
path("account" / IntNumber) { accountId =>
get { ... } ~
put { ... } ~
delete { ... }
}
}
} ~
pathPrefix("v1") {
proxyToDjango
} ~
pathPrefix("doc") {
respondWithHeader(`Cache-Control`(`max-age`(3600))) {
transformResponse(_.withContentTransformed(markdown2Html)) {
getFromResourceDirectory("doc/root",
pathRewriter = appendFileExt)
}
}
} ~
} ~
cacheIfEnabled {
encodeResponse(Gzip) {
getFromResourceDirectory("public")
}
}