Merge pull request #16414 from spray/wip-16162-mathias

!htp #16162 Upgrade server- and client-side HTTP APIs to "final style"
This commit is contained in:
Roland Kuhn 2014-12-01 17:57:27 +01:00
commit 998f261f51
26 changed files with 772 additions and 755 deletions

View file

@ -6,6 +6,7 @@ package docs.http
import akka.actor.ActorSystem
import akka.http.model._
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
class HttpServerExampleSpec
@ -20,11 +21,9 @@ class HttpServerExampleSpec
implicit val system = ActorSystem()
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080)
source.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
for (connection <- serverBinding.connections) {
println("Accepted new connection from " + connection.remoteAddress)
// handle connection here
}
//#bind-example
@ -37,7 +36,7 @@ class HttpServerExampleSpec
implicit val system = ActorSystem()
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080)
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
//#full-server-example
import akka.http.model.HttpMethods._
@ -54,13 +53,10 @@ class HttpServerExampleSpec
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}
// ...
serverBinding.connections foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
source.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
flow.join(Flow[HttpRequest].map(requestHandler)).run()
connection handleWith { Flow[HttpRequest] map requestHandler }
}
//#full-server-example
}

View file

@ -163,8 +163,4 @@ akka.http {
User-Agent = 32
}
}
# Fully qualified config path which holds the dispatcher configuration
# to be used for the HttpManager.
manager-dispatcher = "akka.actor.default-dispatcher"
}

View file

@ -4,109 +4,218 @@
package akka.http
import java.io.Closeable
import java.net.InetSocketAddress
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.Future
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.io.Inet
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.scaladsl._
import scala.collection.immutable
import akka.io.Inet
import akka.http.engine.client.{ HttpClientPipeline, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServerPipeline, ServerSettings }
import akka.http.engine.client.{ HttpClient, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServer, ServerSettings }
import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest }
import akka.actor._
import scala.concurrent.Future
object Http extends ExtensionKey[HttpExt] with ExtensionIdProvider {
class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension {
import Http._
/**
* A flow representing an outgoing HTTP connection, and the key used to get information about
* the materialized connection. The flow takes pairs of a ``HttpRequest`` and a user definable
* context that will be correlated with the corresponding ``HttpResponse``.
* Creates a [[ServerBinding]] instance which represents a prospective HTTP server binding on the given `endpoint`.
*/
final case class OutgoingFlow(flow: Flow[(HttpRequest, Any), (HttpResponse, Any)],
key: Key { type MaterializedType = Future[Http.OutgoingConnection] })
def bind(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log): ServerBinding = {
val endpoint = new InetSocketAddress(interface, port)
val effectiveSettings = ServerSettings(settings)
val tcpBinding = StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout)
new ServerBinding {
def localAddress(mm: MaterializedMap) = tcpBinding.localAddress(mm)
val connections = tcpBinding.connections map { tcpConn
new IncomingConnection {
def localAddress = tcpConn.localAddress
def remoteAddress = tcpConn.remoteAddress
def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer) =
tcpConn.handleWith(HttpServer.serverFlowToTransport(handler, effectiveSettings, log))
}
}
def unbind(mm: MaterializedMap): Future[Unit] = tcpBinding.unbind(mm)
}
}
/**
* The materialized result of an outgoing HTTP connection stream with a single connection as the underlying transport.
* Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow.
*/
final case class OutgoingConnection(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress)
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log): Flow[ByteString, ByteString] = {
val effectiveSettings = ServerSettings(settings)
HttpServer.serverFlowToTransport(serverFlow, effectiveSettings, log)
}
/**
* A source representing an bound HTTP server socket, and the key to get information about
* the materialized bound socket.
* Creates an [[OutgoingConnection]] instance representing a prospective HTTP client connection to the given endpoint.
*/
final case class ServerSource(source: Source[IncomingConnection],
key: Key { type MaterializedType = Future[ServerBinding] })
def outgoingConnection(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None,
log: LoggingAdapter = system.log): OutgoingConnection = {
val effectiveSettings = ClientConnectionSettings(settings)
val remoteAddr = new InetSocketAddress(host, port)
val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress,
options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout)
new OutgoingConnection {
def remoteAddress = remoteAddr
def localAddress(mm: MaterializedMap) = transportFlow.localAddress(mm)
val flow = HttpClient.transportToConnectionClientFlow(transportFlow.flow, remoteAddr, effectiveSettings, log)
}
}
/**
* An incoming HTTP connection.
* Transforms the given low-level TCP client transport [[Flow]] into a higher-level HTTP client flow.
*/
final case class IncomingConnection(remoteAddress: InetSocketAddress, stream: Flow[HttpResponse, HttpRequest])
def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString],
remoteAddress: InetSocketAddress, // TODO: removed after #16168 is cleared
settings: Option[ClientConnectionSettings] = None,
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse] = {
val effectiveSettings = ClientConnectionSettings(settings)
HttpClient.transportToConnectionClientFlow(transport, remoteAddress, effectiveSettings, log)
}
}
object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
/**
* Represents a prospective HTTP server binding.
*/
trait ServerBinding {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]]
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress]
/**
* The stream of accepted incoming connections.
* Can be materialized several times but only one subscription can be "live" at one time, i.e.
* subsequent materializations will reject subscriptions with an [[StreamTcp.BindFailedException]] if the previous
* materialization still has an uncancelled subscription.
* Cancelling the subscription to a materialization of this source will cause the listening port to be unbound.
*/
def connections: Source[IncomingConnection]
/**
* Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
* [[Source]] whose [[MaterializedMap]] is passed as parameter.
*
* The produced [[Future]] is fulfilled when the unbinding has been completed.
*/
def unbind(materializedMap: MaterializedMap): Future[Unit]
/**
* Materializes the `connections` [[Source]] and handles all connections with the given flow.
*
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
* connections are being accepted at maximum rate, which, depending on the applications, might
* present a DoS risk!
*/
def startHandlingWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap =
connections.to(ForeachSink(_ handleWith handler)).run()
/**
* Materializes the `connections` [[Source]] and handles all connections with the given flow.
*
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
* connections are being accepted at maximum rate, which, depending on the applications, might
* present a DoS risk!
*/
def startHandlingWithSyncHandler(handler: HttpRequest HttpResponse)(implicit fm: FlowMaterializer): MaterializedMap =
startHandlingWith(Flow[HttpRequest].map(handler))
/**
* Materializes the `connections` [[Source]] and handles all connections with the given flow.
*
* Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all
* connections are being accepted at maximum rate, which, depending on the applications, might
* present a DoS risk!
*/
def startHandlingWithAsyncHandler(handler: HttpRequest Future[HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap =
startHandlingWith(Flow[HttpRequest].mapAsync(handler))
}
/**
* Represents one accepted incoming HTTP connection.
*/
sealed trait IncomingConnection {
/**
* The local address this connection is bound to.
*/
def localAddress: InetSocketAddress
/**
* The remote address this connection is bound to.
*/
def remoteAddress: InetSocketAddress
/**
* Handles the connection with the given flow, which is materialized exactly once
* and the respective [[MaterializedMap]] returned.
*/
def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap
/**
* Handles the connection with the given handler function.
* Returns the [[MaterializedMap]] of the underlying flow materialization.
*/
def handleWithSyncHandler(handler: HttpRequest HttpResponse)(implicit fm: FlowMaterializer): MaterializedMap =
handleWith(Flow[HttpRequest].map(handler))
/**
* Handles the connection with the given handler function.
* Returns the [[MaterializedMap]] of the underlying flow materialization.
*/
def handleWithAsyncHandler(handler: HttpRequest Future[HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap =
handleWith(Flow[HttpRequest].mapAsync(handler))
}
/**
* Represents a prospective outgoing HTTP connection.
*/
sealed trait OutgoingConnection {
/**
* The remote address this connection is or will be bound to.
*/
def remoteAddress: InetSocketAddress
/**
* The local address of the endpoint bound by the materialization of the connection materialization
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(mMap: MaterializedMap): Future[InetSocketAddress]
/**
* A flow representing the HTTP server on a single HTTP connection.
* This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[StreamTcp.ConnectionAttemptFailedException]].
*/
def flow: Flow[HttpRequest, HttpResponse]
}
class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message)
class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary)
/**
* The materialized result of a bound HTTP server socket.
*/
private[akka] sealed abstract case class ServerBinding(localAddress: InetSocketAddress) extends Closeable
//////////////////// EXTENSION SETUP ///////////////////
/**
* INTERNAL API
*/
private[akka] object ServerBinding {
def apply(localAddress: InetSocketAddress, closeable: Closeable): ServerBinding =
new ServerBinding(localAddress) {
override def close() = closeable.close()
}
}
}
def apply()(implicit system: ActorSystem): HttpExt = super.apply(system)
class HttpExt(system: ExtendedActorSystem) extends Extension {
@volatile private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline]
def lookup() = Http
def connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: Option[ClientConnectionSettings]): Http.OutgoingFlow = {
// FIXME #16378 Where to do logging? log.debug("Attempting connection to {}", remoteAddress)
val effectiveSettings = ClientConnectionSettings(settings)(system)
val tcpFlow = StreamTcp(system).connect(remoteAddress, localAddress, options, effectiveSettings.connectingTimeout)
val pipeline = clientPipelines.getOrElse(effectiveSettings, {
val pl = new HttpClientPipeline(effectiveSettings, system.log)(system.dispatcher)
clientPipelines = clientPipelines.updated(effectiveSettings, pl)
pl
})
pipeline(tcpFlow, remoteAddress)
}
def connect(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None): Http.OutgoingFlow =
connect(new InetSocketAddress(host, port), localAddress, options, settings)
def bind(endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[Inet.SocketOption],
serverSettings: Option[ServerSettings]): Http.ServerSource = {
import system.dispatcher
// FIXME IdleTimeout?
val src = StreamTcp(system).bind(endpoint, backlog, options)
val key = new Key {
override type MaterializedType = Future[Http.ServerBinding]
override def materialize(map: MaterializedMap) = map.get(src).map(s Http.ServerBinding(s.localAddress, s))
}
val log = system.log
val effectiveSettings = ServerSettings(serverSettings)(system)
Http.ServerSource(src.withKey(key).map(new HttpServerPipeline(effectiveSettings, log)), key)
}
def bind(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
serverSettings: Option[ServerSettings] = None): Http.ServerSource =
bind(new InetSocketAddress(interface, port), backlog, options, serverSettings)
}
def createExtension(system: ExtendedActorSystem): HttpExt =
new HttpExt(system.settings.config getConfig "akka.http")(system)
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser }
import akka.http.engine.parsing.ParserOutput._
import akka.http.util._
/**
* INTERNAL API
*/
private[http] object HttpClient {
def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString],
remoteAddress: InetSocketAddress,
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse] = {
import settings._
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpResponseParser(
parserSettings,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty)
})
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
.via(transport)
.transform("rootParser", ()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
}
// FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends (HttpRequest RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
}

View file

@ -1,111 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.stream.scaladsl._
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.io.StreamTcp
import akka.util.ByteString
import akka.http.Http
import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpRequestParser, HttpHeaderParser, HttpResponseParser }
import akka.http.engine.parsing.ParserOutput._
import akka.http.util._
import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
*/
private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings,
log: LoggingAdapter)(implicit ec: ExecutionContext)
extends ((StreamTcp.OutgoingTcpFlow, InetSocketAddress) Http.OutgoingFlow) {
import effectiveSettings._
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpResponseParser(
parserSettings,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty)
})
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
def apply(tcpFlow: StreamTcp.OutgoingTcpFlow, remoteAddress: InetSocketAddress): Http.OutgoingFlow = {
import FlowGraphImplicits._
val httpKey = new HttpKey(tcpFlow.key)
val flowWithHttpKey = tcpFlow.flow.withKey(httpKey)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
val pipeline = Flow() { implicit b
val userIn = UndefinedSource[(HttpRequest, Any)]
val userOut = UndefinedSink[(HttpResponse, Any)]
val bypassFanout = Unzip[HttpRequest, Any]("bypassFanout")
val bypassFanin = Zip[HttpResponse, Any]("bypassFanin")
val requestPipeline =
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
val responsePipeline =
Flow[ByteString]
.transform("rootParser", ()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
//FIXME: the graph is unnecessary after fixing #15957
userIn ~> bypassFanout.in
bypassFanout.left ~> requestPipeline ~> flowWithHttpKey ~> responsePipeline ~> bypassFanin.left
bypassFanout.right ~> bypassFanin.right
bypassFanin.out ~> userOut
userIn -> userOut
}
Http.OutgoingFlow(pipeline, httpKey)
}
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends ((HttpRequest) RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
class HttpKey(tcpKey: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) extends Key {
type MaterializedType = Future[Http.OutgoingConnection]
override def materialize(map: MaterializedMap): MaterializedType =
map.get(tcpKey).map(tcp Http.OutgoingConnection(tcp.remoteAddress, tcp.localAddress))
}
}

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import akka.http.model.{ HttpResponse, HttpRequest }
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
/**
* A `HttpClientProcessor` models an HTTP client as a stream processor that provides
* responses for requests with an attached context object of a custom type,
* which is funneled through and completely transparent to the processor itself.
*/
trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)]
object HttpClientProcessor {
def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)],
responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] =
new HttpClientProcessor[T] {
override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s)
override def onError(t: Throwable): Unit = requestSubscriber.onError(t)
override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s)
override def onComplete(): Unit = requestSubscriber.onComplete()
override def onNext(t: (HttpRequest, T)): Unit = requestSubscriber.onNext(t)
}
}

View file

@ -4,68 +4,40 @@
package akka.http.engine.server
import akka.actor.{ Props, ActorRef }
import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.util.ByteString
import akka.stream.io.StreamTcp
import akka.stream.FlattenStrategy
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.model._
import akka.http.engine.parsing.ParserOutput._
import akka.http.Http
import akka.http.model._
import akka.http.util._
import akka.http.Http
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAdapter)
extends (StreamTcp.IncomingTcpConnection Http.IncomingConnection) {
import settings.parserSettings
private[http] object HttpServer {
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpRequestParser(
parserSettings,
settings.rawRequestUriHeader,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
})
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: ServerSettings,
log: LoggingAdapter): Flow[ByteString, ByteString] = {
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpRequestParser(
settings.parserSettings,
settings.rawRequestUriHeader,
HttpHeaderParser(settings.parserSettings) { errorInfo
if (settings.parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
})
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = {
import FlowGraphImplicits._
val userIn = UndefinedSink[HttpRequest]
val userOut = UndefinedSource[HttpResponse]
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
@ -76,28 +48,50 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda
}
}
// FIXME The whole pipeline can maybe be created up front when #16168 is fixed
val pipeline = Flow() { implicit b
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge(settings, log)
val requestParsing = Flow[ByteString].transform("rootParser", ()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
val requestParsing = Flow[ByteString].transform("rootParser", ()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
val transportIn = UndefinedSource[ByteString]
val transportOut = UndefinedSink[ByteString]
import FlowGraphImplicits._
Flow() { implicit b
//FIXME: the graph is unnecessary after fixing #15957
userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> tcpConn.stream ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn
transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut
bypassFanout ~> bypassMerge.bypassInput
oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput
b.allowCycles()
userOut -> userIn
transportIn -> transportOut
}
Http.IncomingConnection(tcpConn.remoteAddress, pipeline)
}
class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {
class BypassMerge(settings: ServerSettings, log: LoggingAdapter) extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {
import FlexiMerge._
val bypassInput = createInputPort[RequestOutput]()
val oneHundredContinueInput = createInputPort[OneHundredContinue.type]()

View file

@ -12,6 +12,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.stream.io.StreamTcp
import akka.stream.FlowMaterializer
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
@ -36,45 +37,39 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
"The server-side HTTP infrastructure" should {
"properly bind and unbind a server" in {
"properly bind a server" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm = source.to(Sink(c)).run()
val Http.ServerBinding(localAddress) = Await.result(mm.get(key), 3.seconds)
val sub = c.expectSubscription()
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val binding = Http().bind(hostname, port)
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connections.runWith(Sink(probe))
val sub = probe.expectSubscription() // if we get it we are bound
sub.cancel()
// TODO: verify unbinding effect
}
"report failure if bind fails" in {
"report failure if bind fails" in pendingUntilFixed { // FIXME: "unpend"!
val (hostname, port) = temporaryServerHostnameAndPort()
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm1 = source.to(Sink(c1)).run()
val sub = c1.expectSubscription()
val Http.ServerBinding(localAddress) = Await.result(mm1.get(key), 3.seconds)
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val c2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm2 = source.to(Sink(c2)).run()
val failure = intercept[akka.stream.io.StreamTcp.IncomingTcpException] {
val serverBinding = Await.result(mm2.get(key), 3.seconds)
}
failure.getMessage should be("Bind failed")
sub.cancel()
val binding = Http().bind(hostname, port)
val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm1 = binding.connections.to(Sink(probe1)).run()
probe1.expectSubscription()
val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connections.runWith(Sink(probe2))
probe2.expectError(StreamTcp.BindFailedException)
Await.result(binding.unbind(mm1), 1.second)
val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm3 = binding.connections.to(Sink(probe3)).run()
probe3.expectSubscription() // we bound a second time, which means the previous unbind was successful
Await.result(binding.unbind(mm3), 1.second)
}
"properly complete a simple request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Symbol]()
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext)
clientOutSub.sendNext(HttpRequest(uri = "/abc"))
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
@ -85,12 +80,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val (response, 'abcContext) = clientIn.expectNext()
val response = clientIn.expectNext()
toStrict(response.entity) shouldEqual HttpEntity("yeah")
}
"properly complete a chunked request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Long]()
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()
val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk)
@ -98,7 +93,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val chunkedEntity = HttpEntity.Chunked(chunkedContentType, Source(chunks))
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678)
clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity))
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
@ -112,8 +107,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext()
val HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext()
Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks
}
@ -126,35 +121,35 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
def configOverrides = ""
// automatically bind a server
val connectionStream: SubscriberProbe[Http.IncomingConnection] = {
val connSource = {
val settings = configOverrides.toOption.map(ServerSettings.apply)
val Http.ServerSource(source, key) = Http(system).bind(hostname, port, serverSettings = settings)
val binding = Http().bind(hostname, port, settings = settings)
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]
source.to(Sink(probe)).run()
binding.connections.runWith(Sink(probe))
probe
}
val connectionStreamSub = connectionStream.expectSubscription()
val connSourceSub = connSource.expectSubscription()
def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = {
val outgoingFlow = Http(system).connect(hostname, port, settings = settings)
val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]()
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]()
val tflow = outgoingFlow.flow.asInstanceOf[Flow[((HttpRequest, T)), ((HttpResponse, T))]]
val mm = Flow(Sink(responseSubscriberProbe), Source(requestPublisherProbe)).join(tflow).run()
val connection = Await.result(mm.get(outgoingFlow.key), 3.seconds)
connection.remoteAddress.getPort shouldEqual port
def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = {
val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]()
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]()
val connection = Http().outgoingConnection(hostname, port, settings = settings)
connection.remoteAddress.getHostName shouldEqual hostname
connection.remoteAddress.getPort shouldEqual port
Source(requestPublisherProbe).via(connection.flow).runWith(Sink(responseSubscriberProbe))
requestPublisherProbe -> responseSubscriberProbe
}
def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = {
connectionStreamSub.request(1)
val Http.IncomingConnection(remoteAddress, flow) = connectionStream.expectNext()
connSourceSub.request(1)
val incomingConnection = connSource.expectNext()
val sink = PublisherSink[HttpRequest]()
val source = SubscriberSource[HttpResponse]()
val mm = incomingConnection.handleWith(Flow(sink, source))
val requestSubscriberProbe = StreamTestKit.SubscriberProbe[HttpRequest]()
val responsePublisherProbe = StreamTestKit.PublisherProbe[HttpResponse]()
Flow(Sink(requestSubscriberProbe), Source(responsePublisherProbe)).join(flow).run()
mm.get(sink).subscribe(requestSubscriberProbe)
responsePublisherProbe.subscribe(mm.get(source))
requestSubscriberProbe -> responsePublisherProbe
}

View file

@ -17,15 +17,15 @@ object TestClient extends App {
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import akka.http.TestClient.system.dispatcher
implicit val fm = FlowMaterializer()
import system.dispatcher
implicit val materializer = FlowMaterializer()
val host = "spray.io"
println(s"Fetching HTTP server version of host `$host` ...")
val outgoingFlow = Http(system).connect(host)
val result = Source.singleton(HttpRequest() -> 'NoContext).via(outgoingFlow.flow).map(_._1).runWith(Sink.head)
val connection = Http().outgoingConnection(host)
val result = Source.singleton(HttpRequest()).via(connection.flow).runWith(Sink.head)
result.map(_.header[headers.Server]) onComplete {
case Success(res) println(s"$host is running ${res mkString ", "}")

View file

@ -7,11 +7,9 @@ package akka.http
import akka.actor.ActorSystem
import akka.http.model._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import akka.stream.scaladsl.Flow
import com.typesafe.config.{ ConfigFactory, Config }
import HttpMethods._
import scala.concurrent.Await
import scala.concurrent.duration._
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
@ -19,30 +17,21 @@ object TestServer extends App {
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
implicit val fm = FlowMaterializer()
val requestHandler: HttpRequest HttpResponse = {
val binding = Http().bind(interface = "localhost", port = 8080)
binding startHandlingWithSyncHandler {
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, key) = Http(system).bind(interface = "localhost", port = 8080)
val materializedMap = source.to(Sink.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
flow.join(Flow[HttpRequest].map(requestHandler)).run()
}).run()
val serverBinding = Await.result(materializedMap.get(key), 3 seconds)
println(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}")
println(s"Server online at http://localhost:8080")
println("Press RETURN to stop...")
Console.readLine()
serverBinding.close()
system.shutdown()
////////////// helpers //////////////

View file

@ -10,9 +10,7 @@ import akka.event.NoLogging
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.http.Http
import akka.http.model._
import akka.http.util._
import headers._
@ -20,7 +18,7 @@ import HttpEntity._
import MediaTypes._
import HttpMethods._
class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside {
class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside {
implicit val materializer = FlowMaterializer()
"The server implementation" should {
@ -612,14 +610,18 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
}
class TestSetup {
val netIn = StreamTestKit.PublisherProbe[ByteString]
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val tcpConnection = StreamTcp.IncomingTcpConnection(null, Flow(Sink(netOut), Source(netIn)))
val requests = StreamTestKit.SubscriberProbe[HttpRequest]
val responses = StreamTestKit.PublisherProbe[HttpResponse]
def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
val pipeline = new HttpServerPipeline(settings, NoLogging)
val Http.IncomingConnection(_, httpPipelineFlow) = pipeline(tcpConnection)
val (netIn, netOut) = {
val netIn = StreamTestKit.PublisherProbe[ByteString]
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val transportFlow = HttpServer.serverFlowToTransport(Flow(Sink(requests), Source(responses)), settings, NoLogging)
Source(netIn).via(transportFlow).runWith(Sink(netOut))
netIn -> netOut
}
def wipeDate(string: String) =
string.fastSplit('\n').map {
@ -627,10 +629,6 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
case s s
}.mkString("\n")
val requests = StreamTestKit.SubscriberProbe[HttpRequest]
val responses = StreamTestKit.PublisherProbe[HttpResponse]
Flow(Sink(requests), Source(responses)).join(httpPipelineFlow).run()
val netInSub = netIn.expectSubscription()
val netOutSub = netOut.expectSubscription()
val requestsSub = requests.expectSubscription()
@ -647,4 +645,4 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
def closeNetworkInput(): Unit = netInSub.sendComplete()
}
}
}

View file

@ -15,7 +15,7 @@ import akka.http.server._
import akka.http.model._
import StatusCodes._
import HttpMethods._
import ScalaRoutingDSL._
import Directives._
class ScalatestRouteTestSpec extends FreeSpec with Matchers with ScalatestRouteTest {

View file

@ -7,7 +7,6 @@ package akka.http.server
import akka.http.model
import model.HttpMethods._
import model.StatusCodes
import akka.http.server.PathMatchers.{ Segment, IntNumber }
class BasicRouteSpecs extends RoutingSpec {
@ -40,7 +39,6 @@ class BasicRouteSpecs extends RoutingSpec {
val stringDirective = provide("The cat")
val intDirective = provide(42)
val doubleDirective = provide(23.0)
val symbolDirective = provide('abc)
val dirStringInt = stringDirective & intDirective
val dirStringIntDouble = dirStringInt & doubleDirective
@ -78,7 +76,7 @@ class BasicRouteSpecs extends RoutingSpec {
}
"Route disjunction" should {
"work in the happy case" in {
val route = sealRoute((path("abc") | path("def")) {
val route = Route.seal((path("abc") | path("def")) {
completeOk
})
@ -138,7 +136,7 @@ class BasicRouteSpecs extends RoutingSpec {
case object MyException extends RuntimeException
"Route sealing" should {
"catch route execution exceptions" in {
Get("/abc") ~> ScalaRoutingDSL.sealRoute {
Get("/abc") ~> Route.seal {
get { ctx
throw MyException
}
@ -147,7 +145,7 @@ class BasicRouteSpecs extends RoutingSpec {
}
}
"catch route building exceptions" in {
Get("/abc") ~> ScalaRoutingDSL.sealRoute {
Get("/abc") ~> Route.seal {
get {
throw MyException
}
@ -157,7 +155,7 @@ class BasicRouteSpecs extends RoutingSpec {
}
"convert all rejections to responses" in {
object MyRejection extends Rejection
Get("/abc") ~> ScalaRoutingDSL.sealRoute {
Get("/abc") ~> Route.seal {
get {
reject(MyRejection)
}

View file

@ -8,7 +8,7 @@ import org.scalatest.{ WordSpec, Suite, Matchers }
import akka.http.model.HttpResponse
import akka.http.testkit.ScalatestRouteTest
trait GenericRoutingSpec extends Matchers with ScalaRoutingDSL with ScalatestRouteTest { this: Suite
trait GenericRoutingSpec extends Matchers with Directives with ScalatestRouteTest { this: Suite
val Ok = HttpResponse()
val completeOk = complete(Ok)

View file

@ -7,26 +7,20 @@ package akka.http.server
import akka.http.marshallers.xml.ScalaXmlSupport
import akka.http.server.directives.AuthenticationDirectives._
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.util.Timeout
import akka.http.Http
import akka.http.model._
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.log-dead-letters = off
""")
akka.log-dead-letters = off""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
implicit val materializer = FlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
val serverSource = Http(system).bind(interface = "localhost", port = 8080)
import ScalaRoutingDSL._
import ScalaXmlSupport._
import Directives._
def auth =
HttpBasicAuthenticator.provideUserName {
@ -34,11 +28,9 @@ object TestServer extends App {
case _ false
}
// FIXME: a simple `import ScalaXmlSupport._` should suffice but currently doesn't because
// of #16190
implicit val html = ScalaXmlSupport.nodeSeqMarshaller(MediaTypes.`text/html`)
val binding = Http().bind(interface = "localhost", port = 8080)
handleConnections(serverSource) withRoute {
val materializedMap = binding startHandlingWith {
get {
path("") {
complete(index)
@ -58,9 +50,8 @@ object TestServer extends App {
}
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
Console.readLine()
system.shutdown()
binding.unbind(materializedMap).onComplete(_ system.shutdown())
lazy val index =
<html>

View file

@ -31,7 +31,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec {
} ~> check { rejection shouldEqual AuthenticationFailedRejection(CredentialsRejected, challenge) }
}
"reject requests with illegal Authorization header with 401" in {
Get() ~> RawHeader("Authorization", "bob alice") ~> sealRoute {
Get() ~> RawHeader("Authorization", "bob alice") ~> Route.seal {
dontAuth { echoComplete }
} ~> check {
status shouldEqual StatusCodes.Unauthorized
@ -52,7 +52,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec {
"properly handle exceptions thrown in its inner route" in {
object TestException extends RuntimeException
Get() ~> Authorization(BasicHttpCredentials("Alice", "")) ~> {
sealRoute {
Route.seal {
doAuth { _ throw TestException }
}
} ~> check { status shouldEqual StatusCodes.InternalServerError }
@ -66,7 +66,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec {
}
val bothAuth = dontAuth | otherAuth
Get() ~> sealRoute {
Get() ~> Route.seal {
bothAuth { echoComplete }
} ~> check {
status shouldEqual StatusCodes.Unauthorized

View file

@ -71,7 +71,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
}
"be transparent to non-200 responses" in {
Get() ~> addHeader(Range(ByteRange(1, 2))) ~> sealRoute(wrs(reject())) ~> check {
Get() ~> addHeader(Range(ByteRange(1, 2))) ~> Route.seal(wrs(reject())) ~> check {
status == NotFound
headers.exists { case `Content-Range`(_, _) true; case _ false } shouldEqual false
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.server
import scala.concurrent.Future
import akka.stream.scaladsl.Flow
import akka.http.model.{ HttpRequest, HttpResponse }
import akka.http.util.FastFuture._
object Route {
/**
* Helper for constructing a Route from a function literal.
*/
def apply(f: Route): Route = f
/**
* "Seals" a route by wrapping it with exception handling and rejection conversion.
*/
def seal(route: Route)(implicit setup: RoutingSetup): Route = {
import directives.ExecutionDirectives._
import setup._
val sealedExceptionHandler =
if (exceptionHandler.isDefault) exceptionHandler
else exceptionHandler orElse ExceptionHandler.default(settings)
val sealedRejectionHandler =
if (rejectionHandler.isDefault) rejectionHandler
else rejectionHandler orElse RejectionHandler.default
handleExceptions(sealedExceptionHandler) {
handleRejections(sealedRejectionHandler) {
route
}
}
}
/**
* Turns a `Route` into an server flow.
*/
def handlerFlow(route: Route)(implicit setup: RoutingSetup): Flow[HttpRequest, HttpResponse] =
Flow[HttpRequest].mapAsync(asyncHandler(route))
/**
* Turns a `Route` into an async handler function.
*/
def asyncHandler(route: Route)(implicit setup: RoutingSetup): HttpRequest Future[HttpResponse] = {
import setup._
val sealedRoute = seal(route)
request
sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request), setup.settings)).fast.map {
case RouteResult.Complete(response) response
case RouteResult.Rejected(rejected) throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!")
}
}
}

View file

@ -5,8 +5,8 @@
package akka.http.server
import scala.collection.immutable
import akka.http.model.HttpResponse
import akka.stream.scaladsl.Flow
import akka.http.model.{ HttpRequest, HttpResponse }
/**
* The result of handling a request.
@ -19,4 +19,7 @@ sealed trait RouteResult
object RouteResult {
final case class Complete(response: HttpResponse) extends RouteResult
final case class Rejected(rejections: immutable.Seq[Rejection]) extends RouteResult
implicit def route2HandlerFlow(route: Route)(implicit setup: RoutingSetup): Flow[HttpRequest, HttpResponse] =
Route.handlerFlow(route)
}

View file

@ -4,13 +4,12 @@
package akka.http.server
import akka.stream.FlowMaterializer
import scala.concurrent.ExecutionContext
import akka.actor.{ ActorSystem, ActorContext }
import akka.event.LoggingAdapter
import akka.http.model.HttpRequest
import akka.actor.{ ActorSystem, ActorContext }
import akka.stream.FlowMaterializer
import akka.http.Http
import akka.http.model.HttpRequest
/**
* Provides a ``RoutingSetup`` for a given connection.

View file

@ -1,80 +0,0 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.server
import scala.concurrent.{ ExecutionContext, Future }
import akka.stream.scaladsl._
import akka.stream.FlowMaterializer
import akka.http.util.FastFuture
import akka.http.model.{ HttpRequest, HttpResponse }
import akka.http.Http
import FastFuture._
/**
* The main entry point into the Scala routing DSL.
*
* `import ScalaRoutingDSL._` to bring everything required into scope.
*/
trait ScalaRoutingDSL extends Directives {
sealed trait Applicator[R] {
def withRoute(route: Route): R
def withSyncHandler(handler: HttpRequest HttpResponse): R
def withAsyncHandler(handler: HttpRequest Future[HttpResponse]): R
}
def handleConnections(serverSource: Http.ServerSource)(implicit fm: FlowMaterializer,
setupProvider: RoutingSetupProvider): Applicator[Unit] = {
new Applicator[Unit] {
def withRoute(route: Route): Unit =
run(routeRunner(route, _))
def withSyncHandler(handler: HttpRequest HttpResponse): Unit =
withAsyncHandler(request FastFuture.successful(handler(request)))
def withAsyncHandler(handler: HttpRequest Future[HttpResponse]): Unit =
run(_ handler)
private def run(f: RoutingSetup HttpRequest Future[HttpResponse]): Unit =
serverSource.source.foreach {
case connection @ Http.IncomingConnection(remoteAddress, flow)
val setup = setupProvider(connection)
setup.routingLog.log.debug("Accepted new connection from " + remoteAddress)
val runner = f(setup)
flow.join(Flow[HttpRequest].mapAsync(request runner(request))).run()(fm)
}
}
}
def routeRunner(route: Route, setup: RoutingSetup): HttpRequest Future[HttpResponse] = {
import setup._
val sealedRoute = sealRoute(route)(setup)
request
sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request), setup.settings)).fast.map {
case RouteResult.Complete(response) response
case RouteResult.Rejected(rejected) throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!")
}
}
/**
* "Seals" a route by wrapping it with exception handling and rejection conversion.
*/
def sealRoute(route: Route)(implicit setup: RoutingSetup): Route = {
import setup._
val sealedExceptionHandler =
if (exceptionHandler.isDefault) exceptionHandler
else exceptionHandler orElse ExceptionHandler.default(settings)
val sealedRejectionHandler =
if (rejectionHandler.isDefault) rejectionHandler
else rejectionHandler orElse RejectionHandler.default
handleExceptions(sealedExceptionHandler) {
handleRejections(sealedRejectionHandler) {
route
}
}
}
}
object ScalaRoutingDSL extends ScalaRoutingDSL

View file

@ -15,10 +15,5 @@ package object server {
type PathMatcher0 = PathMatcher[Unit]
type PathMatcher1[T] = PathMatcher[Tuple1[T]]
/**
* Helper for constructing a Route from a function literal.
*/
def Route(f: Route): Route = f
def FIXME = throw new RuntimeException("Not yet implemented")
}

View file

@ -3,14 +3,11 @@
*/
package akka.stream.io
import akka.stream.io.StreamTcp.{ TcpServerBinding, IncomingTcpConnection }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.ByteString
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import akka.stream.scaladsl._
class TcpFlowSpec extends AkkaSpec with TcpHelper {
@ -26,7 +23,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpReadProbe = new TcpReadProbe()
val tcpWriteProbe = new TcpWriteProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
validateServerClientCommunication(testData, serverConnection, tcpReadProbe, tcpWriteProbe)
@ -42,7 +39,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).via(StreamTcp(system).connect(server.address).flow).to(Sink.ignore).run()
Source(testInput).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink.ignore).run()
val serverConnection = server.waitAccept()
serverConnection.read(256)
@ -57,7 +54,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val idle = new TcpWriteProbe() // Just register an idle upstream
val resultFuture =
Source(idle.publisherProbe)
.via(StreamTcp(system).connect(server.address).flow)
.via(StreamTcp().outgoingConnection(server.address).flow)
.fold(ByteString.empty)((acc, in) acc ++ in)
val serverConnection = server.waitAccept()
@ -76,7 +73,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
tcpWriteProbe.close()
@ -93,7 +90,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
tcpReadProbe.close()
@ -113,7 +110,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
// FIXME: here (and above tests) add a chitChat() method ensuring this works even after prior communication
@ -138,7 +135,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpWriteProbe = new TcpWriteProbe()
val tcpReadProbe = new TcpReadProbe()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run()
val serverConnection = server.waitAccept()
serverConnection.abort()
@ -159,22 +156,20 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val tcpWriteProbe1 = new TcpWriteProbe()
val tcpReadProbe2 = new TcpReadProbe()
val tcpWriteProbe2 = new TcpWriteProbe()
val outgoingFlow = StreamTcp(system).connect(server.address)
val outgoingConnection = StreamTcp().outgoingConnection(server.address)
val mm1 = Source(tcpWriteProbe1.publisherProbe).via(outgoingFlow.flow).to(Sink(tcpReadProbe1.subscriberProbe)).run()
val mm1 = Source(tcpWriteProbe1.publisherProbe).via(outgoingConnection.flow).to(Sink(tcpReadProbe1.subscriberProbe)).run()
val serverConnection1 = server.waitAccept()
val mm2 = Source(tcpWriteProbe2.publisherProbe).via(outgoingFlow.flow).to(Sink(tcpReadProbe2.subscriberProbe)).run()
val mm2 = Source(tcpWriteProbe2.publisherProbe).via(outgoingConnection.flow).to(Sink(tcpReadProbe2.subscriberProbe)).run()
val serverConnection2 = server.waitAccept()
validateServerClientCommunication(testData, serverConnection1, tcpReadProbe1, tcpWriteProbe1)
validateServerClientCommunication(testData, serverConnection2, tcpReadProbe2, tcpWriteProbe2)
// Since we have already communicated over the connections we can have short timeouts for the futures
val outgoingConnection1 = Await.result(mm1.get(outgoingFlow.key), 100 millis)
val outgoingConnection2 = Await.result(mm2.get(outgoingFlow.key), 100 millis)
outgoingConnection1.remoteAddress.getPort should be(server.address.getPort)
outgoingConnection2.remoteAddress.getPort should be(server.address.getPort)
outgoingConnection1.localAddress.getPort should not be (outgoingConnection2.localAddress.getPort)
outgoingConnection.remoteAddress.getPort should be(server.address.getPort)
val localAddress1 = Await.result(outgoingConnection.localAddress(mm1), 100.millis)
val localAddress2 = Await.result(outgoingConnection.localAddress(mm2), 100.millis)
localAddress1.getPort should not be localAddress2.getPort
tcpWriteProbe1.close()
tcpReadProbe1.close()
@ -187,47 +182,39 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
"TCP listen stream" must {
// Reusing handler
val echoHandler = ForeachSink[IncomingTcpConnection] { incoming
incoming.stream.join(Flow.empty).run()
}
val echoHandler = ForeachSink[StreamTcp.IncomingConnection] { _ handleWith Flow[ByteString] }
"be able to implement echo" in {
import system.dispatcher
val serverAddress = temporaryServerAddress
val binding = StreamTcp(system).bind(serverAddress)
val echoServer = binding.to(echoHandler).run()
val binding = StreamTcp().bind(serverAddress)
val echoServerMM = binding.connections.to(echoHandler).run()
val echoServerFinish = echoServer.get(echoHandler)
val echoServerBinding = echoServer.get(binding)
val echoServerFinish = echoServerMM.get(echoHandler)
// make sure that the server has bound to the socket
Await.result(echoServerBinding, 3.seconds)
Await.result(binding.localAddress(echoServerMM), 3.seconds)
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
val resultFuture =
Source(testInput).via(StreamTcp(system).connect(serverAddress).flow).fold(ByteString.empty)((acc, in) acc ++ in)
Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).fold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 3.seconds) should be(expectedOutput)
echoServerBinding.foreach(_.close)
Await.result(echoServerFinish, 3.seconds)
Await.result(binding.unbind(echoServerMM), 3.seconds)
Await.result(echoServerFinish, 1.second)
}
"work with a chain of echoes" in {
import system.dispatcher
val serverAddress = temporaryServerAddress
val binding = StreamTcp(system).bind(serverAddress)
val echoServer = binding.to(echoHandler).run()
val echoServerMM = binding.connections.to(echoHandler).run()
val echoServerFinish = echoServer.get(echoHandler)
val echoServerBinding = echoServer.get(binding)
val echoServerFinish = echoServerMM.get(echoHandler)
// make sure that the server has bound to the socket
Await.result(echoServerBinding, 3.seconds)
Await.result(binding.localAddress(echoServerMM), 3.seconds)
val echoConnection = StreamTcp(system).connect(serverAddress).flow
val echoConnection = StreamTcp().outgoingConnection(serverAddress).flow
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
@ -240,9 +227,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
.via(echoConnection)
.fold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 3.seconds) should be(expectedOutput)
echoServerBinding.foreach(_.close)
Await.result(echoServerFinish, 3.seconds)
Await.result(resultFuture, 5.seconds) should be(expectedOutput)
Await.result(binding.unbind(echoServerMM), 3.seconds)
Await.result(echoServerFinish, 1.second)
}
}

View file

@ -3,68 +3,203 @@
*/
package akka.stream.io
import akka.actor._
import akka.io.Inet.SocketOption
import akka.io.Tcp
import akka.pattern.ask
import akka.stream.impl._
import akka.stream.MaterializerSettings
import akka.stream.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.Closeable
import java.net.{ InetSocketAddress, URLEncoder }
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection._
import scala.concurrent.duration._
import scala.concurrent.{ Promise, ExecutionContext, Future }
import org.reactivestreams.{ Processor, Subscriber, Subscription }
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Promise, ExecutionContext, Future }
import akka.util.ByteString
import akka.io.Inet.SocketOption
import akka.io.Tcp
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl._
import akka.stream.impl._
import akka.actor._
object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
override def lookup = StreamTcp
override def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system)
override def get(system: ActorSystem): StreamTcpExt = super.get(system)
/**
* The materialized result of an outgoing TCP connection stream.
* Represents a prospective TCP server binding.
*/
case class OutgoingTcpConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
trait ServerBinding {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]]
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress]
/**
* A flow representing an outgoing TCP connection, and the key used to get information about the materialized connection.
*/
case class OutgoingTcpFlow(flow: Flow[ByteString, ByteString], key: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] })
/**
* The stream of accepted incoming connections.
* Can be materialized several times but only one subscription can be "live" at one time, i.e.
* subsequent materializations will reject subscriptions with an [[BindFailedException]] if the previous
* materialization still has an uncancelled subscription.
* Cancelling the subscription to a materialization of this source will cause the listening port to be unbound.
*/
def connections: Source[IncomingConnection]
/**
* The materialized result of a bound server socket.
*/
abstract sealed case class TcpServerBinding(localAddress: InetSocketAddress) extends Closeable
/**
* INTERNAL API
*/
private[akka] object TcpServerBinding {
def apply(localAddress: InetSocketAddress): TcpServerBinding =
new TcpServerBinding(localAddress) {
override def close() = ()
}
def apply(localAddress: InetSocketAddress, closeable: Closeable): TcpServerBinding =
new TcpServerBinding(localAddress) {
override def close() = closeable.close()
}
/**
* Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
* [[Source]] whose [[MaterializedMap]] is passed as parameter.
*
* The produced [[Future]] is fulfilled when the unbinding has been completed.
*/
def unbind(materializedMap: MaterializedMap): Future[Unit]
}
/**
* An incoming TCP connection.
* Represents an accepted incoming TCP connection.
*/
case class IncomingTcpConnection(remoteAddress: InetSocketAddress, stream: Flow[ByteString, ByteString])
trait IncomingConnection {
/**
* The local address this connection is bound to.
*/
def localAddress: InetSocketAddress
/**
* The remote address this connection is bound to.
*/
def remoteAddress: InetSocketAddress
/**
* Handles the connection using the given flow, which is materialized exactly once and the respective
* [[MaterializedMap]] returned.
*
* Convenience shortcut for: `flow.join(handler).run()`.
*/
def handleWith(handler: Flow[ByteString, ByteString])(implicit materializer: FlowMaterializer): MaterializedMap
/**
* A flow representing the client on the other side of the connection.
* This flow can be materialized only once.
*/
def flow: Flow[ByteString, ByteString]
}
/**
* The exception thrown on bind or accept failures.
* Represents a prospective outgoing TCP connection.
*/
class IncomingTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace
sealed trait OutgoingConnection {
/**
* The remote address this connection is or will be bound to.
*/
def remoteAddress: InetSocketAddress
/**
* The local address of the endpoint bound by the materialization of the connection materialization
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(mMap: MaterializedMap): Future[InetSocketAddress]
/**
* Handles the connection using the given flow.
* This method can be called several times, every call will materialize the given flow exactly once thereby
* triggering a new connection attempt to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[ConnectionAttemptFailedException]].
*
* Convenience shortcut for: `flow.join(handler).run()`.
*/
def handleWith(handler: Flow[ByteString, ByteString])(implicit materializer: FlowMaterializer): MaterializedMap
/**
* A flow representing the server on the other side of the connection.
* This flow can be materialized several times, every materialization will open a new connection to the
* `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated
* with a [[ConnectionAttemptFailedException]].
*/
def flow: Flow[ByteString, ByteString]
}
case object BindFailedException extends RuntimeException with NoStackTrace
class ConnectionException(message: String) extends RuntimeException(message)
class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed")
//////////////////// EXTENSION SETUP ///////////////////
def apply()(implicit system: ActorSystem): StreamTcpExt = super.apply(system)
def lookup() = StreamTcp
def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system)
}
class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension {
import StreamTcpExt._
import StreamTcp._
import system.dispatcher
private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM")
/**
* Creates a [[ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
*/
def bind(endpoint: InetSocketAddress,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf): ServerBinding = {
val connectionSource = new KeyedActorFlowSource[IncomingConnection] {
override type MaterializedType = (Future[InetSocketAddress], Future[() Future[Unit]])
override def attach(flowSubscriber: Subscriber[IncomingConnection],
materializer: ActorBasedFlowMaterializer,
flowName: String): MaterializedType = {
val localAddressPromise = Promise[InetSocketAddress]()
val unbindPromise = Promise[() Future[Unit]]()
manager ! StreamTcpManager.Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options,
idleTimeout)
localAddressPromise.future -> unbindPromise.future
}
}
new ServerBinding {
def localAddress(mm: MaterializedMap) = mm.get(connectionSource)._1
def connections = connectionSource
def unbind(mm: MaterializedMap): Future[Unit] = mm.get(connectionSource)._2.flatMap(_())
}
}
/**
* Creates an [[OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
*/
def outgoingConnection(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): OutgoingConnection = {
val remoteAddr = remoteAddress
val key = new PreMaterializedOutgoingKey()
val stream = Pipe(key) { ()
val processorPromise = Promise[Processor[ByteString, ByteString]]()
val localAddressPromise = Promise[InetSocketAddress]()
manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options,
connectTimeout, idleTimeout)
(new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), localAddressPromise.future)
}
new OutgoingConnection {
def remoteAddress = remoteAddr
def localAddress(mm: MaterializedMap) = mm.get(key)
def flow = stream
def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) =
flow.join(handler).run()
}
}
}
/**
* INTERNAL API
*/
private[akka] object StreamTcpExt {
/**
* INTERNAL API
*/
class PreMaterializedOutgoingKey extends Key {
type MaterializedType = Future[InetSocketAddress]
override def materialize(map: MaterializedMap) =
throw new IllegalStateException("This key has already been materialized by the TCP Processor")
}
}
/**
@ -75,8 +210,8 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[
private val setVarFuture = implFuture.andThen { case Success(p) impl = p }
override def onSubscribe(s: Subscription): Unit = implFuture.onComplete {
case Success(impl) impl.onSubscribe(s)
case Failure(_) s.cancel()
case Success(x) x.onSubscribe(s)
case Failure(_) s.cancel()
}
override def onError(t: Throwable): Unit = {
@ -92,106 +227,11 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[
override def onNext(t: I): Unit = impl.onNext(t)
override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete {
case Success(impl) impl.subscribe(s)
case Failure(e) s.onError(e)
case Success(x) x.subscribe(s)
case Failure(e) s.onError(e)
}
}
/**
* INTERNAL API
*/
private[akka] object StreamTcpExt {
/**
* INTERNAL API
*/
class PreMaterializedOutgoingKey extends Key {
type MaterializedType = Future[StreamTcp.OutgoingTcpConnection]
override def materialize(map: MaterializedMap) =
throw new IllegalArgumentException("This key have already been materialized by the TCP Processor")
}
}
class StreamTcpExt(val system: ExtendedActorSystem) extends Extension {
import StreamTcpExt._
import StreamTcp._
private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM")
/**
* Creates a Flow that represents a TCP connection to a remote host. The actual connection is only attempted
* when the Flow is materialized. The returned Flow is reusable, each new materialization will attempt to open
* a new connection to the remote host.
*
* @param remoteAddress the address to connect to
* @param localAddress optionally specifies a specific address to bind to
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
* @param connectTimeout the desired timeout for connection establishment, infinite means "no timeout"
* @param idleTimeout the desired idle timeout on the connection, infinite means "no timeout"
*
*/
def connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): OutgoingTcpFlow = {
implicit val t = Timeout(3.seconds)
import system.dispatcher
val key = new PreMaterializedOutgoingKey()
val pipe = Pipe(key) { ()
{
val promise = Promise[OutgoingTcpConnection]
val future = (StreamTcp(system).manager ? StreamTcpManager.Connect(remoteAddress, localAddress, None, options, connectTimeout, idleTimeout))
.mapTo[StreamTcpManager.ConnectReply]
future.map(r OutgoingTcpConnection(r.remoteAddress, r.localAddress)).onComplete(promise.complete(_))
(new DelayedInitProcessor[ByteString, ByteString](future.map(_.processor)), promise.future)
}
}
StreamTcp.OutgoingTcpFlow(pipe, key)
}
/**
* Returns a Source that represents a port listening to incoming connections. The actual binding to the local port
* happens when the Source is first materialized. This Source is not reusable until the listen port becomes available
* again.
*
* @param localAddress the socket address to bind to; use port zero for automatic assignment (i.e. an ephemeral port)
* @param backlog the number of unaccepted connections the O/S
* kernel will hold for this port before refusing connections.
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
* @param idleTimeout the desired idle timeout on the accepted connections, infinite means "no timeout"
*/
def bind(localAddress: InetSocketAddress,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf): KeyedSource[IncomingTcpConnection] { type MaterializedType = Future[TcpServerBinding] } = {
new KeyedActorFlowSource[IncomingTcpConnection] {
implicit val t = Timeout(3.seconds)
import system.dispatcher
override def attach(flowSubscriber: Subscriber[IncomingTcpConnection],
materializer: ActorBasedFlowMaterializer,
flowName: String): MaterializedType = {
val bindingFuture = (StreamTcp(system).manager ? StreamTcpManager.Bind(localAddress, None, backlog, options, idleTimeout))
.mapTo[StreamTcpManager.BindReply]
bindingFuture.map(_.connectionStream).onComplete {
case Success(impl) impl.subscribe(flowSubscriber)
case Failure(e) flowSubscriber.onError(e)
}
bindingFuture.map { bf TcpServerBinding(bf.localAddress, bf.closeable) }
}
override type MaterializedType = Future[TcpServerBinding]
}
}
}
/**
* INTERNAL API
*/
@ -199,34 +239,24 @@ private[io] object StreamTcpManager {
/**
* INTERNAL API
*/
private[io] case class ConnectReply(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
processor: Processor[ByteString, ByteString])
private[io] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[SocketOption],
connectTimeout: Duration,
idleTimeout: Duration)
/**
* INTERNAL API
*/
private[io] case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
materializerSettings: Option[MaterializerSettings] = None,
options: immutable.Traversable[SocketOption] = Nil,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf)
/**
* INTERNAL API
*/
private[io] case class Bind(localAddress: InetSocketAddress,
settings: Option[MaterializerSettings] = None,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil,
idleTimeout: Duration = Duration.Inf)
/**
* INTERNAL API
*/
private[io] case class BindReply(localAddress: InetSocketAddress,
connectionStream: Publisher[StreamTcp.IncomingTcpConnection],
closeable: Closeable)
private[io] case class Bind(localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[SocketOption],
idleTimeout: Duration)
/**
* INTERNAL API
@ -242,32 +272,26 @@ private[akka] class StreamTcpManager extends Actor {
import akka.stream.io.StreamTcpManager._
var nameCounter = 0
def encName(prefix: String, address: InetSocketAddress) = {
def encName(prefix: String, endpoint: InetSocketAddress) = {
nameCounter += 1
s"$prefix-$nameCounter-${URLEncoder.encode(address.toString, "utf-8")}"
s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}"
}
def receive: Receive = {
case Connect(remoteAddress, localAddress, maybeMaterializerSettings, options, connectTimeout, idleTimeout)
case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _)
val connTimeout = connectTimeout match {
case x: FiniteDuration Some(x)
case _ None
}
val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system)
val processorActor = context.actorOf(TcpStreamActor.outboundProps(
val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise,
Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
requester = sender(),
settings = materializerSettings), name = encName("client", remoteAddress))
materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress))
processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor))
case Bind(localAddress, maybeMaterializerSettings, backlog, options, idleTimeout)
val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system)
val publisherActor = context.actorOf(TcpListenStreamActor.props(
Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true),
requester = sender(),
materializerSettings), name = encName("server", localAddress))
case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _)
val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise,
flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true),
MaterializerSettings(context.system)), name = encName("server", endpoint))
// this sends the ExposedPublisher message to the publisher actor automatically
ActorPublisher[Any](publisherActor)
}

View file

@ -3,7 +3,10 @@
*/
package akka.stream.io
import java.net.InetSocketAddress
import akka.io.{ IO, Tcp }
import scala.concurrent.Promise
import scala.util.control.NoStackTrace
import akka.actor.{ ActorRefFactory, Actor, Props, ActorRef, Status }
import akka.stream.impl._
@ -20,8 +23,13 @@ private[akka] object TcpStreamActor {
case object WriteAck extends Tcp.Event
class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace
def outboundProps(connectCmd: Connect, requester: ActorRef, settings: MaterializerSettings): Props =
Props(new OutboundTcpStreamActor(connectCmd, requester, settings)).withDispatcher(settings.dispatcher)
def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
connectCmd: Connect,
materializerSettings: MaterializerSettings): Props =
Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, connectCmd,
materializerSettings)).withDispatcher(materializerSettings.dispatcher)
def inboundProps(connection: ActorRef, settings: MaterializerSettings): Props =
Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher)
}
@ -201,7 +209,9 @@ private[akka] class InboundTcpStreamActor(
/**
* INTERNAL API
*/
private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requester: ActorRef, _settings: MaterializerSettings)
private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
val connectCmd: Connect, _settings: MaterializerSettings)
extends TcpStreamActor(_settings) {
import TcpStreamActor._
import context.system
@ -222,11 +232,14 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
tcpOutputs.setConnection(connection)
tcpInputs.setConnection(connection)
requester ! StreamTcpManager.ConnectReply(remoteAddress, localAddress, exposedProcessor)
localAddressPromise.success(localAddress)
processorPromise.success(exposedProcessor)
initSteps.become(Actor.emptyBehavior)
case f: CommandFailed
val ex = new TcpStreamException("Connection failed.")
requester ! Status.Failure(ex)
localAddressPromise.failure(ex)
processorPromise.failure(ex)
fail(ex)
}
}

View file

@ -3,35 +3,38 @@
*/
package akka.stream.io
import java.io.Closeable
import akka.actor._
import java.net.InetSocketAddress
import akka.stream.io.StreamTcp.ConnectionException
import org.reactivestreams.Subscriber
import scala.concurrent.{ Future, Promise }
import akka.util.ByteString
import akka.io.Tcp._
import akka.io.{ IO, Tcp }
import akka.stream.MaterializerSettings
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.{ Flow, Pipe }
import akka.stream.impl._
import akka.stream.scaladsl.{ Pipe, Flow }
import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher }
import scala.util.control.NoStackTrace
import akka.actor._
/**
* INTERNAL API
*/
private[akka] object TcpListenStreamActor {
def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props = {
Props(new TcpListenStreamActor(bindCmd, requester, settings))
def props(localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
bindCmd: Tcp.Bind, materializerSettings: MaterializerSettings): Props = {
Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, bindCmd, materializerSettings))
}
}
/**
* INTERNAL API
*/
private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings) extends Actor
private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
bindCmd: Tcp.Bind, settings: MaterializerSettings) extends Actor
with Pump with Stash {
import akka.stream.io.TcpListenStreamActor._
import context.system
object primaryOutputs extends SimpleOutputs(self, pump = this) {
@ -48,7 +51,9 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
def getExposedPublisher = exposedPublisher
}
private val unboundPromise = Promise[Unit]()
private var finished = false
override protected def pumpFinished(): Unit = {
if (!finished) {
finished = true
@ -71,16 +76,15 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
nextPhase(runningPhase)
listener ! ResumeAccepting(1)
val target = self
requester ! StreamTcpManager.BindReply(
localAddress,
primaryOutputs.getExposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]],
new Closeable {
override def close() = target ! Unbind
})
localAddressPromise.success(localAddress)
unbindPromise.success(() { target ! Unbind; unboundPromise.future })
primaryOutputs.getExposedPublisher.subscribe(flowSubscriber.asInstanceOf[Subscriber[Any]])
subreceive.become(running)
case f: CommandFailed
val ex = new StreamTcp.IncomingTcpException("Bind failed")
requester ! Status.Failure(ex)
val ex = StreamTcp.BindFailedException
localAddressPromise.failure(ex)
unbindPromise.failure(ex)
flowSubscriber.onError(ex)
fail(ex)
}
@ -89,12 +93,16 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
pendingConnection = (c, sender())
pump()
case f: CommandFailed
fail(new StreamTcp.IncomingTcpException(s"Command [${f.cmd}] failed"))
val ex = new ConnectionException(s"Command [${f.cmd}] failed")
unbindPromise.tryFailure(ex)
fail(ex)
case Unbind
cancel()
if (!closed && listener != null) listener ! Unbind
listener = null
pump()
case Unbound // If we're unbound then just shut down
closed = true
cancel()
unboundPromise.trySuccess(())
pump()
}
@ -114,7 +122,6 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
listener ! ResumeAccepting(1)
elem
}
}
final override def receive = {
@ -132,7 +139,19 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement()
val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, settings))
val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor)
primaryOutputs.enqueueOutputElement(StreamTcp.IncomingTcpConnection(connected.remoteAddress, Pipe(() processor)))
val conn = new StreamTcp.IncomingConnection {
val flow = Pipe(() processor)
def localAddress = connected.localAddress
def remoteAddress = connected.remoteAddress
def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) =
flow.join(handler).run()
}
primaryOutputs.enqueueOutputElement(conn)
}
override def postStop(): Unit = {
unboundPromise.trySuccess(())
super.postStop()
}
def fail(e: Throwable): Unit = {