2015-01-13 @ Vienna Scala User Group
Mathias Doenitz
/
/
This presentation: http://spray.io/vienna/
twitterStream
.produceTo(rxjavaObservable)
.produceTo(reactorStream)
.produceTo(akkaStream)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Source(stockTickerPublisher) // Source[Tick]
.filter(_.symbol == "AAPL") // Source[Tick]
.buffer(100000, OverflowStrategy.DropHead) // Source[Tick]
.splitWhen(x => isNewDay(x.timeStamp)) // Source[Source[Tick]]
.headAndTail // Source[(Tick, Source[Tick])]
.map { case (head, tail) =>
head -> tail.groupedWithin(1000, 1.second)
} // Source[(Tick, Source[Seq[[Tick]]])]
.via(someFlow) // Source[RichTick]
.map(toCandleStickChart) // Source[CandleStickChart]
.to(candleStickChartSink) // RunnableFlow
.run() // MaterializedMap
FlowGraph { implicit b ⇒
val bcast = Broadcast[T]
val merge = Merge[T]
source ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> sink
bcast ~> f4 ~> merge
}.run()
Isn't this "feature creep"?
Too high-level?
Where is the connection to its "actor heart"?
"A toolkit and runtime for building
highly concurrent, distributed, and fault-tolerant
event-driven applications on the JVM"
"Build powerful
concurrent
concurrent
&
distributed
distributed
applications more easily"
def bind(endpoint: InetSocketAddress, ...): Http.ServerBinding
object Http {
trait ServerBinding {
def connections: Source[IncomingConnection]
def localAddress(mm: MaterializedMap): Future[InetSocketAddress]
def unbind(mm: MaterializedMap): Future[Unit]
/* ... plus `startHandlingWithXXX(...)` sugar ... */
}
}
object Http {
trait IncomingConnection {
def localAddress: InetSocketAddress
def remoteAddress: InetSocketAddress
def handleWith
(handler: Flow[HttpRequest, HttpResponse]): MaterializedMap
def handleWithSyncHandler
(handler: HttpRequest ⇒ HttpResponse): MaterializedMap
def handleWithAsyncHandler
(handler: HttpRequest ⇒ Future[HttpResponse]): MaterializedMap
}
}
/**
* Transforms a given HTTP-level server Flow
* into a lower-level TCP transport flow.
*/
def serverFlowToTransport(
serverFlow: Flow[HttpRequest, HttpResponse],
...): Flow[ByteString, ByteString]
val binding = Http().bind("localhost", 8080)
binding startHandlingWithSyncHandler {
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒
HttpResponse(entity = "PONG!")
case _ ⇒ // catch all
HttpResponse(404, entity = "Unknown resource!")
}
def outgoingConnection(endpoint: InetSocketAddress, ...)
: Http.OutgoingConnection
object Http {
trait OutgoingConnection {
def remoteAddress: InetSocketAddress
def localAddress(mm: MaterializedMap): Future[InetSocketAddress]
def flow: Flow[HttpRequest, HttpResponse]
}
}
/**
* Transforms the given low-level TCP client transport
* Flow into a higher-level HTTP client flow.
*/
def transportToConnectionClientFlow(
transport: Flow[ByteString, ByteString],
...): Flow[HttpRequest, HttpResponse]
case class HttpRequest(
method: HttpMethod = HttpMethods.GET,
uri: Uri = Uri./,
headers: immutable.Seq[HttpHeader] = Nil,
entity: RequestEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
case class HttpResponse(
status: StatusCode = StatusCodes.OK,
headers: immutable.Seq[HttpHeader] = Nil,
entity: ResponseEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`
) extends HttpMessage
case class Uri( // proper RFC 3986
scheme: String, // compliant,
authority: Authority, // immutable
path: Path, // URI model
query: Query, // with a fast,
fragment: Option[String]) // custom parser
sealed trait HttpEntity
sealed trait ResponseEntity extends HttpEntity
sealed trait RequestEntity extends ResponseEntity
// can be used for any message (request or response)
type MessageEntity = RequestEntity
sealed trait BodyPartEntity extends HttpEntity
// can be used for messages as well as bodyparts
sealed trait UniversalEntity extends HttpEntity
object HttpEntity {
case class Strict(contentType: ContentType,
data: ByteString) extends UniversalEntity
case class Default(contentType: ContentType, contentLength: Long,
data: Source[ByteString]) extends UniversalEntity
case class Chunked(contentType: ContentType,
chunks: Source[ChunkStreamPart]) extends MessageEntity
case class CloseDelimited(contentType: ContentType,
data: Source[ByteString]) extends ResponseEntity
case class IndefiniteLength(contentType: ContentType,
data: Source[ByteString]) extends BodyPartEntity
}
case class `Accept-Charset`(charsetRanges: immutable.Seq[HttpCharsetRange])
extends HttpHeader
case class `Cache-Control`(directives: immutable.Seq[CacheDirective])
extends HttpHeader
case class `Set-Cookie`(cookie: HttpCookie)
extends HttpHeader
case class RawHeader(name: String, value: String)
extends HttpHeader
import Directives._
val binding = Http().bind("localhost", 8080)
binding startHandlingWith {
path("order" / HexIntNumber) { id =>
get {
complete(s"Received GET for order $id")
} ~
put {
complete(s"Received PUT for order $id")
}
}
}
authorize, cancelRejection, cancelRejections, complete, completeOrRecoverWith, completeWith, compressResponse, compressResponseIfRequested, conditional, cookie, decodeRequest, decompressRequest, delete, deleteCookie, deleteCookie, encodeResponse, entity, extract, extractExecutionContext, extractFlowMaterializer, extractHost, extractLog, extractRequest, extractScheme, extractSettings, extractUnmatchedPath, extractUri, failWith, formField, formFields, get, getFromBrowseableDirectories, getFromBrowseableDirectory, getFromDirectory, getFromFile, getFromResource, getFromResourceDirectory, handleExceptions, handleRejections, handleWith, head, headerValue, headerValueByName, headerValueByType, headerValuePF, host, listDirectoryContents, logRequest, logRequestResult, logResult, mapInnerRoute, mapRejections, mapRequest, mapRequestContext, mapResponse, mapResponseEntity, mapResponseHeaders, mapRouteResult, mapRouteResultFuture, mapRouteResultPF, mapRouteResultWith, mapRouteResultWithPF, mapSettings, mapUnmatchedPath, method, onComplete, onSuccess, optionalCookie, optionalHeaderValue, optionalHeaderValueByName, optionalHeaderValueByType, optionalHeaderValuePF, options, overrideMethodWithParameter, overrideStatusCode, parameter, parameterMap, parameterMultiMap, parameters, parameterSeq, pass, patch, path, pathEnd, pathEndOrSingleSlash, pathPrefix, pathPrefixTest, pathSingleSlash, pathSuffix, pathSuffixTest, post, provide, put, rawPathPrefix, rawPathPrefixTest, recoverRejections, recoverRejectionsWith, redirect, reject, reject, requestEncodedWith, respondWithDefaultHeader, respondWithDefaultHeaders, respondWithHeader, respondWithHeaders, responseEncodingAccepted, scheme, setCookie, textract, tprovide, withExecutionContext, withFlowMaterializer, withLog, withRangeSupport, withSettings
lazy val route =
encodeResponse(Gzip) {
pathEndOrSingleSlash {
get {
redirect("/doc")
}
} ~
pathPrefix("api") {
path("top-articles") {
get {
parameter("max".as[Int]) { max =>
validate(max >= 0, "query parameter 'max' must be >= 0") {
complete {
(topArticlesService ? max).mapTo[Seq[Article]]
}
}
}
}
} ~
tokenAuthenticate { user =>
path("ranking") {
get {
countAndTime(user, "ranking") {
parameters("fixed" ? 0, "mobile" ? 0, "sms" ? 0, "mms" ? 0,
"data" ? 0).as(RankingDescriptor) { descr =>
complete {
(rankingService ? Ranking(descr)).mapTo[RankingResult]
}
}
}
}
} ~
path("accounts") {
post {
authorize(user.isAdmin) {
entity(as[AccountDetails]) { details =>
complete {
(accountService ? NewAccount(details)).mapTo[OpResult]
}
}
}
}
} ~
path("account" / IntNumber) { accountId =>
get { ... } ~
put { ... } ~
delete { ... }
}
}
} ~
pathPrefix("v1") {
proxyToDjango
} ~
pathPrefix("doc") {
respondWithHeader(`Cache-Control`(`max-age`(3600))) {
transformResponse(_.withContentTransformed(markdown2Html)) {
getFromResourceDirectory("doc/root",
pathRewriter = appendFileExt)
}
}
} ~
} ~
cacheIfEnabled {
encodeResponse(Gzip) {
getFromResourceDirectory("public")
}
}