diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index dbf5e6fa9d..10e84f1f1e 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -6,6 +6,7 @@ package docs.http import akka.actor.ActorSystem import akka.http.model._ +import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec class HttpServerExampleSpec @@ -20,11 +21,9 @@ class HttpServerExampleSpec implicit val system = ActorSystem() implicit val materializer = FlowMaterializer() - val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080) - source.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - + val serverBinding = Http(system).bind(interface = "localhost", port = 8080) + for (connection <- serverBinding.connections) { + println("Accepted new connection from " + connection.remoteAddress) // handle connection here } //#bind-example @@ -37,7 +36,7 @@ class HttpServerExampleSpec implicit val system = ActorSystem() implicit val materializer = FlowMaterializer() - val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080) + val serverBinding = Http(system).bind(interface = "localhost", port = 8080) //#full-server-example import akka.http.model.HttpMethods._ @@ -54,13 +53,10 @@ class HttpServerExampleSpec case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") } - // ... + serverBinding.connections foreach { connection => + println("Accepted new connection from " + connection.remoteAddress) - source.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - - flow.join(Flow[HttpRequest].map(requestHandler)).run() + connection handleWith { Flow[HttpRequest] map requestHandler } } //#full-server-example } diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index bd88d2e5fb..b07f3a5ba6 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -163,8 +163,4 @@ akka.http { User-Agent = 32 } } - - # Fully qualified config path which holds the dispatcher configuration - # to be used for the HttpManager. - manager-dispatcher = "akka.actor.default-dispatcher" } diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index a4457fb069..7ba141a2d5 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -4,109 +4,218 @@ package akka.http -import java.io.Closeable import java.net.InetSocketAddress +import com.typesafe.config.Config +import scala.collection.immutable +import scala.concurrent.Future +import akka.event.LoggingAdapter +import akka.util.ByteString +import akka.io.Inet +import akka.stream.FlowMaterializer import akka.stream.io.StreamTcp import akka.stream.scaladsl._ -import scala.collection.immutable -import akka.io.Inet -import akka.http.engine.client.{ HttpClientPipeline, ClientConnectionSettings } -import akka.http.engine.server.{ HttpServerPipeline, ServerSettings } +import akka.http.engine.client.{ HttpClient, ClientConnectionSettings } +import akka.http.engine.server.{ HttpServer, ServerSettings } import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest } import akka.actor._ -import scala.concurrent.Future - -object Http extends ExtensionKey[HttpExt] with ExtensionIdProvider { +class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension { + import Http._ /** - * A flow representing an outgoing HTTP connection, and the key used to get information about - * the materialized connection. The flow takes pairs of a ``HttpRequest`` and a user definable - * context that will be correlated with the corresponding ``HttpResponse``. + * Creates a [[ServerBinding]] instance which represents a prospective HTTP server binding on the given `endpoint`. */ - final case class OutgoingFlow(flow: Flow[(HttpRequest, Any), (HttpResponse, Any)], - key: Key { type MaterializedType = Future[Http.OutgoingConnection] }) + def bind(interface: String, port: Int = 80, backlog: Int = 100, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: Option[ServerSettings] = None, + log: LoggingAdapter = system.log): ServerBinding = { + val endpoint = new InetSocketAddress(interface, port) + val effectiveSettings = ServerSettings(settings) + val tcpBinding = StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout) + new ServerBinding { + def localAddress(mm: MaterializedMap) = tcpBinding.localAddress(mm) + val connections = tcpBinding.connections map { tcpConn ⇒ + new IncomingConnection { + def localAddress = tcpConn.localAddress + def remoteAddress = tcpConn.remoteAddress + def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer) = + tcpConn.handleWith(HttpServer.serverFlowToTransport(handler, effectiveSettings, log)) + } + } + def unbind(mm: MaterializedMap): Future[Unit] = tcpBinding.unbind(mm) + } + } /** - * The materialized result of an outgoing HTTP connection stream with a single connection as the underlying transport. + * Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow. */ - final case class OutgoingConnection(remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress) + def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse], + settings: Option[ServerSettings] = None, + log: LoggingAdapter = system.log): Flow[ByteString, ByteString] = { + val effectiveSettings = ServerSettings(settings) + HttpServer.serverFlowToTransport(serverFlow, effectiveSettings, log) + } /** - * A source representing an bound HTTP server socket, and the key to get information about - * the materialized bound socket. + * Creates an [[OutgoingConnection]] instance representing a prospective HTTP client connection to the given endpoint. */ - final case class ServerSource(source: Source[IncomingConnection], - key: Key { type MaterializedType = Future[ServerBinding] }) + def outgoingConnection(host: String, port: Int = 80, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: Option[ClientConnectionSettings] = None, + log: LoggingAdapter = system.log): OutgoingConnection = { + val effectiveSettings = ClientConnectionSettings(settings) + val remoteAddr = new InetSocketAddress(host, port) + val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress, + options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout) + new OutgoingConnection { + def remoteAddress = remoteAddr + def localAddress(mm: MaterializedMap) = transportFlow.localAddress(mm) + val flow = HttpClient.transportToConnectionClientFlow(transportFlow.flow, remoteAddr, effectiveSettings, log) + } + } /** - * An incoming HTTP connection. + * Transforms the given low-level TCP client transport [[Flow]] into a higher-level HTTP client flow. */ - final case class IncomingConnection(remoteAddress: InetSocketAddress, stream: Flow[HttpResponse, HttpRequest]) + def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString], + remoteAddress: InetSocketAddress, // TODO: removed after #16168 is cleared + settings: Option[ClientConnectionSettings] = None, + log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse] = { + val effectiveSettings = ClientConnectionSettings(settings) + HttpClient.transportToConnectionClientFlow(transport, remoteAddress, effectiveSettings, log) + } +} + +object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { + + /** + * Represents a prospective HTTP server binding. + */ + trait ServerBinding { + /** + * The local address of the endpoint bound by the materialization of the `connections` [[Source]] + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress] + + /** + * The stream of accepted incoming connections. + * Can be materialized several times but only one subscription can be "live" at one time, i.e. + * subsequent materializations will reject subscriptions with an [[StreamTcp.BindFailedException]] if the previous + * materialization still has an uncancelled subscription. + * Cancelling the subscription to a materialization of this source will cause the listening port to be unbound. + */ + def connections: Source[IncomingConnection] + + /** + * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections` + * [[Source]] whose [[MaterializedMap]] is passed as parameter. + * + * The produced [[Future]] is fulfilled when the unbinding has been completed. + */ + def unbind(materializedMap: MaterializedMap): Future[Unit] + + /** + * Materializes the `connections` [[Source]] and handles all connections with the given flow. + * + * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all + * connections are being accepted at maximum rate, which, depending on the applications, might + * present a DoS risk! + */ + def startHandlingWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap = + connections.to(ForeachSink(_ handleWith handler)).run() + + /** + * Materializes the `connections` [[Source]] and handles all connections with the given flow. + * + * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all + * connections are being accepted at maximum rate, which, depending on the applications, might + * present a DoS risk! + */ + def startHandlingWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: FlowMaterializer): MaterializedMap = + startHandlingWith(Flow[HttpRequest].map(handler)) + + /** + * Materializes the `connections` [[Source]] and handles all connections with the given flow. + * + * Note that there is no backpressure being applied to the `connections` [[Source]], i.e. all + * connections are being accepted at maximum rate, which, depending on the applications, might + * present a DoS risk! + */ + def startHandlingWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap = + startHandlingWith(Flow[HttpRequest].mapAsync(handler)) + } + + /** + * Represents one accepted incoming HTTP connection. + */ + sealed trait IncomingConnection { + /** + * The local address this connection is bound to. + */ + def localAddress: InetSocketAddress + + /** + * The remote address this connection is bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * Handles the connection with the given flow, which is materialized exactly once + * and the respective [[MaterializedMap]] returned. + */ + def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap + + /** + * Handles the connection with the given handler function. + * Returns the [[MaterializedMap]] of the underlying flow materialization. + */ + def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: FlowMaterializer): MaterializedMap = + handleWith(Flow[HttpRequest].map(handler)) + + /** + * Handles the connection with the given handler function. + * Returns the [[MaterializedMap]] of the underlying flow materialization. + */ + def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse])(implicit fm: FlowMaterializer): MaterializedMap = + handleWith(Flow[HttpRequest].mapAsync(handler)) + } + + /** + * Represents a prospective outgoing HTTP connection. + */ + sealed trait OutgoingConnection { + /** + * The remote address this connection is or will be bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * The local address of the endpoint bound by the materialization of the connection materialization + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(mMap: MaterializedMap): Future[InetSocketAddress] + + /** + * A flow representing the HTTP server on a single HTTP connection. + * This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`. + * If the connection cannot be established the materialized stream will immediately be terminated + * with a [[StreamTcp.ConnectionAttemptFailedException]]. + */ + def flow: Flow[HttpRequest, HttpResponse] + } + + class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message) class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary) - /** - * The materialized result of a bound HTTP server socket. - */ - private[akka] sealed abstract case class ServerBinding(localAddress: InetSocketAddress) extends Closeable + //////////////////// EXTENSION SETUP /////////////////// - /** - * INTERNAL API - */ - private[akka] object ServerBinding { - def apply(localAddress: InetSocketAddress, closeable: Closeable): ServerBinding = - new ServerBinding(localAddress) { - override def close() = closeable.close() - } - } -} + def apply()(implicit system: ActorSystem): HttpExt = super.apply(system) -class HttpExt(system: ExtendedActorSystem) extends Extension { - @volatile private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline] + def lookup() = Http - def connect(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: immutable.Traversable[Inet.SocketOption], - settings: Option[ClientConnectionSettings]): Http.OutgoingFlow = { - // FIXME #16378 Where to do logging? log.debug("Attempting connection to {}", remoteAddress) - val effectiveSettings = ClientConnectionSettings(settings)(system) - - val tcpFlow = StreamTcp(system).connect(remoteAddress, localAddress, options, effectiveSettings.connectingTimeout) - val pipeline = clientPipelines.getOrElse(effectiveSettings, { - val pl = new HttpClientPipeline(effectiveSettings, system.log)(system.dispatcher) - clientPipelines = clientPipelines.updated(effectiveSettings, pl) - pl - }) - pipeline(tcpFlow, remoteAddress) - } - - def connect(host: String, port: Int = 80, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ClientConnectionSettings] = None): Http.OutgoingFlow = - connect(new InetSocketAddress(host, port), localAddress, options, settings) - - def bind(endpoint: InetSocketAddress, - backlog: Int, - options: immutable.Traversable[Inet.SocketOption], - serverSettings: Option[ServerSettings]): Http.ServerSource = { - import system.dispatcher - - // FIXME IdleTimeout? - val src = StreamTcp(system).bind(endpoint, backlog, options) - val key = new Key { - override type MaterializedType = Future[Http.ServerBinding] - override def materialize(map: MaterializedMap) = map.get(src).map(s ⇒ Http.ServerBinding(s.localAddress, s)) - } - val log = system.log - val effectiveSettings = ServerSettings(serverSettings)(system) - Http.ServerSource(src.withKey(key).map(new HttpServerPipeline(effectiveSettings, log)), key) - } - - def bind(interface: String, port: Int = 80, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, - serverSettings: Option[ServerSettings] = None): Http.ServerSource = - bind(new InetSocketAddress(interface, port), backlog, options, serverSettings) -} + def createExtension(system: ExtendedActorSystem): HttpExt = + new HttpExt(system.settings.config getConfig "akka.http")(system) +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala new file mode 100644 index 0000000000..715ce81fbd --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.net.InetSocketAddress +import scala.collection.immutable.Queue +import akka.util.ByteString +import akka.event.LoggingAdapter +import akka.stream.FlattenStrategy +import akka.stream.scaladsl._ +import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse } +import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } +import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser } +import akka.http.engine.parsing.ParserOutput._ +import akka.http.util._ + +/** + * INTERNAL API + */ +private[http] object HttpClient { + + def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString], + remoteAddress: InetSocketAddress, + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[HttpRequest, HttpResponse] = { + import settings._ + + // the initial header parser we initially use for every connection, + // will not be mutated, all "shared copy" parsers copy on first-write into the header cache + val rootParser = new HttpResponseParser( + parserSettings, + HttpHeaderParser(parserSettings) { errorInfo ⇒ + if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty) + }) + + val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) + val requestMethodByPass = new RequestMethodByPass(remoteAddress) + + Flow[HttpRequest] + .map(requestMethodByPass) + .transform("renderer", () ⇒ requestRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) + .via(transport) + .transform("rootParser", () ⇒ + // each connection uses a single (private) response parser instance for all its responses + // which builds a cache of all header instances seen on that connection + rootParser.createShallowCopy(requestMethodByPass)) + .splitWhen(_.isInstanceOf[MessageStart]) + .headAndTail + .collect { + case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ + HttpResponse(statusCode, headers, createEntity(entityParts), protocol) + } + } + + // FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here + class RequestMethodByPass(serverAddress: InetSocketAddress) + extends (HttpRequest ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { + private[this] var requestMethods = Queue.empty[HttpMethod] + def apply(request: HttpRequest) = { + requestMethods = requestMethods.enqueue(request.method) + RequestRenderingContext(request, serverAddress) + } + def apply(): HttpMethod = + if (requestMethods.nonEmpty) { + val method = requestMethods.head + requestMethods = requestMethods.tail + method + } else HttpResponseParser.NoMethod + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala deleted file mode 100644 index f0ca8e73e3..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.client - -import java.net.InetSocketAddress -import scala.collection.immutable.Queue -import akka.stream.scaladsl._ -import akka.event.LoggingAdapter -import akka.stream.FlattenStrategy -import akka.stream.io.StreamTcp -import akka.util.ByteString -import akka.http.Http -import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse } -import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } -import akka.http.engine.parsing.{ HttpRequestParser, HttpHeaderParser, HttpResponseParser } -import akka.http.engine.parsing.ParserOutput._ -import akka.http.util._ - -import scala.concurrent.{ ExecutionContext, Future } - -/** - * INTERNAL API - */ -private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings, - log: LoggingAdapter)(implicit ec: ExecutionContext) - extends ((StreamTcp.OutgoingTcpFlow, InetSocketAddress) ⇒ Http.OutgoingFlow) { - - import effectiveSettings._ - - // the initial header parser we initially use for every connection, - // will not be mutated, all "shared copy" parsers copy on first-write into the header cache - val rootParser = new HttpResponseParser( - parserSettings, - HttpHeaderParser(parserSettings) { errorInfo ⇒ - if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty) - }) - - val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) - - def apply(tcpFlow: StreamTcp.OutgoingTcpFlow, remoteAddress: InetSocketAddress): Http.OutgoingFlow = { - import FlowGraphImplicits._ - - val httpKey = new HttpKey(tcpFlow.key) - - val flowWithHttpKey = tcpFlow.flow.withKey(httpKey) - - val requestMethodByPass = new RequestMethodByPass(remoteAddress) - - val pipeline = Flow() { implicit b ⇒ - val userIn = UndefinedSource[(HttpRequest, Any)] - val userOut = UndefinedSink[(HttpResponse, Any)] - - val bypassFanout = Unzip[HttpRequest, Any]("bypassFanout") - val bypassFanin = Zip[HttpResponse, Any]("bypassFanin") - - val requestPipeline = - Flow[HttpRequest] - .map(requestMethodByPass) - .transform("renderer", () ⇒ requestRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) - - val responsePipeline = - Flow[ByteString] - .transform("rootParser", () ⇒ - // each connection uses a single (private) response parser instance for all its responses - // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(requestMethodByPass)) - .splitWhen(_.isInstanceOf[MessageStart]) - .headAndTail - .collect { - case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ - HttpResponse(statusCode, headers, createEntity(entityParts), protocol) - } - - //FIXME: the graph is unnecessary after fixing #15957 - userIn ~> bypassFanout.in - bypassFanout.left ~> requestPipeline ~> flowWithHttpKey ~> responsePipeline ~> bypassFanin.left - bypassFanout.right ~> bypassFanin.right - bypassFanin.out ~> userOut - - userIn -> userOut - } - - Http.OutgoingFlow(pipeline, httpKey) - } - - class RequestMethodByPass(serverAddress: InetSocketAddress) - extends ((HttpRequest) ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { - private[this] var requestMethods = Queue.empty[HttpMethod] - def apply(request: HttpRequest) = { - requestMethods = requestMethods.enqueue(request.method) - RequestRenderingContext(request, serverAddress) - } - def apply(): HttpMethod = - if (requestMethods.nonEmpty) { - val method = requestMethods.head - requestMethods = requestMethods.tail - method - } else HttpResponseParser.NoMethod - } - - class HttpKey(tcpKey: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) extends Key { - type MaterializedType = Future[Http.OutgoingConnection] - - override def materialize(map: MaterializedMap): MaterializedType = - map.get(tcpKey).map(tcp ⇒ Http.OutgoingConnection(tcp.remoteAddress, tcp.localAddress)) - } -} diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala deleted file mode 100644 index ac0cddd68f..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.client - -import akka.http.model.{ HttpResponse, HttpRequest } -import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } - -/** - * A `HttpClientProcessor` models an HTTP client as a stream processor that provides - * responses for requests with an attached context object of a custom type, - * which is funneled through and completely transparent to the processor itself. - */ -trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)] - -object HttpClientProcessor { - def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)], - responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] = - new HttpClientProcessor[T] { - override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s) - - override def onError(t: Throwable): Unit = requestSubscriber.onError(t) - override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s) - override def onComplete(): Unit = requestSubscriber.onComplete() - override def onNext(t: (HttpRequest, T)): Unit = requestSubscriber.onNext(t) - } -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala similarity index 63% rename from akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala rename to akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index af94c742f5..7974aaf586 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -4,68 +4,40 @@ package akka.http.engine.server -import akka.actor.{ Props, ActorRef } +import akka.actor.{ ActorRef, Props } +import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.stage.PushPullStage -import akka.util.ByteString -import akka.stream.io.StreamTcp import akka.stream.FlattenStrategy -import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } -import akka.http.model._ import akka.http.engine.parsing.ParserOutput._ -import akka.http.Http +import akka.http.model._ import akka.http.util._ +import akka.http.Http import scala.util.control.NonFatal /** * INTERNAL API */ -private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAdapter) - extends (StreamTcp.IncomingTcpConnection ⇒ Http.IncomingConnection) { - import settings.parserSettings +private[http] object HttpServer { - // the initial header parser we initially use for every connection, - // will not be mutated, all "shared copy" parsers copy on first-write into the header cache - val rootParser = new HttpRequestParser( - parserSettings, - settings.rawRequestUriHeader, - HttpHeaderParser(parserSettings) { errorInfo ⇒ - if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) - }) + def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse], + settings: ServerSettings, + log: LoggingAdapter): Flow[ByteString, ByteString] = { - val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) + // the initial header parser we initially use for every connection, + // will not be mutated, all "shared copy" parsers copy on first-write into the header cache + val rootParser = new HttpRequestParser( + settings.parserSettings, + settings.rawRequestUriHeader, + HttpHeaderParser(settings.parserSettings) { errorInfo ⇒ + if (settings.parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) + }) - val bypassFanout = Broadcast[RequestOutput]("bypassFanout") - - val bypassMerge = new BypassMerge - - val requestPreparation = - Flow[RequestOutput] - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail - .collect { - case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method - HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) - } - - val rendererPipeline = - Flow[ResponseRenderingContext] - .transform("recover", () ⇒ new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed - .transform("renderer", () ⇒ responseRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) - - def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = { - import FlowGraphImplicits._ - - val userIn = UndefinedSink[HttpRequest] - val userOut = UndefinedSource[HttpResponse] + val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 val oneHundredContinueSource = Source[OneHundredContinue.type] { @@ -76,28 +48,50 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda } } - // FIXME The whole pipeline can maybe be created up front when #16168 is fixed - val pipeline = Flow() { implicit b ⇒ + val bypassFanout = Broadcast[RequestOutput]("bypassFanout") + val bypassMerge = new BypassMerge(settings, log) - val requestParsing = Flow[ByteString].transform("rootParser", () ⇒ - // each connection uses a single (private) request parser instance for all its requests - // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef)) + val requestParsing = Flow[ByteString].transform("rootParser", () ⇒ + // each connection uses a single (private) request parser instance for all its requests + // which builds a cache of all header instances seen on that connection + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef)) + val requestPreparation = + Flow[RequestOutput] + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method + HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) + } + + val rendererPipeline = + Flow[ResponseRenderingContext] + .transform("recover", () ⇒ new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed + .transform("renderer", () ⇒ responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) + + val transportIn = UndefinedSource[ByteString] + val transportOut = UndefinedSink[ByteString] + + import FlowGraphImplicits._ + + Flow() { implicit b ⇒ //FIXME: the graph is unnecessary after fixing #15957 - userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> tcpConn.stream ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn + transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut bypassFanout ~> bypassMerge.bypassInput oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput b.allowCycles() - userOut -> userIn + transportIn -> transportOut } - - Http.IncomingConnection(tcpConn.remoteAddress, pipeline) } - class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") { + class BypassMerge(settings: ServerSettings, log: LoggingAdapter) extends FlexiMerge[ResponseRenderingContext]("BypassMerge") { import FlexiMerge._ val bypassInput = createInputPort[RequestOutput]() val oneHundredContinueInput = createInputPort[OneHundredContinue.type]() diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index da98db5163..25be0cf585 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem +import akka.stream.io.StreamTcp import akka.stream.FlowMaterializer import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } @@ -36,45 +37,39 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { "The server-side HTTP infrastructure" should { - "properly bind and unbind a server" in { + "properly bind a server" in { val (hostname, port) = temporaryServerHostnameAndPort() - val Http.ServerSource(source, key) = Http(system).bind(hostname, port) - val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm = source.to(Sink(c)).run() - val Http.ServerBinding(localAddress) = Await.result(mm.get(key), 3.seconds) - val sub = c.expectSubscription() - localAddress.getHostName shouldEqual hostname - localAddress.getPort shouldEqual port - + val binding = Http().bind(hostname, port) + val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + binding.connections.runWith(Sink(probe)) + val sub = probe.expectSubscription() // if we get it we are bound sub.cancel() - - // TODO: verify unbinding effect } - "report failure if bind fails" in { + "report failure if bind fails" in pendingUntilFixed { // FIXME: "unpend"! val (hostname, port) = temporaryServerHostnameAndPort() - val Http.ServerSource(source, key) = Http(system).bind(hostname, port) - val c1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm1 = source.to(Sink(c1)).run() - val sub = c1.expectSubscription() - val Http.ServerBinding(localAddress) = Await.result(mm1.get(key), 3.seconds) - localAddress.getHostName shouldEqual hostname - localAddress.getPort shouldEqual port - val c2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm2 = source.to(Sink(c2)).run() - val failure = intercept[akka.stream.io.StreamTcp.IncomingTcpException] { - val serverBinding = Await.result(mm2.get(key), 3.seconds) - } - failure.getMessage should be("Bind failed") - sub.cancel() + val binding = Http().bind(hostname, port) + val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm1 = binding.connections.to(Sink(probe1)).run() + probe1.expectSubscription() + + val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + binding.connections.runWith(Sink(probe2)) + probe2.expectError(StreamTcp.BindFailedException) + + Await.result(binding.unbind(mm1), 1.second) + val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm3 = binding.connections.to(Sink(probe3)).run() + probe3.expectSubscription() // we bound a second time, which means the previous unbind was successful + Await.result(binding.unbind(mm3), 1.second) } "properly complete a simple request/response cycle" in new TestSetup { - val (clientOut, clientIn) = openNewClientConnection[Symbol]() + val (clientOut, clientIn) = openNewClientConnection() val (serverIn, serverOut) = acceptConnection() val clientOutSub = clientOut.expectSubscription() - clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext) + clientOutSub.sendNext(HttpRequest(uri = "/abc")) val serverInSub = serverIn.expectSubscription() serverInSub.request(1) @@ -85,12 +80,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val clientInSub = clientIn.expectSubscription() clientInSub.request(1) - val (response, 'abcContext) = clientIn.expectNext() + val response = clientIn.expectNext() toStrict(response.entity) shouldEqual HttpEntity("yeah") } "properly complete a chunked request/response cycle" in new TestSetup { - val (clientOut, clientIn) = openNewClientConnection[Long]() + val (clientOut, clientIn) = openNewClientConnection() val (serverIn, serverOut) = acceptConnection() val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk) @@ -98,7 +93,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val chunkedEntity = HttpEntity.Chunked(chunkedContentType, Source(chunks)) val clientOutSub = clientOut.expectSubscription() - clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678) + clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity)) val serverInSub = serverIn.expectSubscription() serverInSub.request(1) @@ -112,8 +107,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val clientInSub = clientIn.expectSubscription() clientInSub.request(1) - val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), - Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext() + val HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), + Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext() Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks } @@ -126,35 +121,35 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def configOverrides = "" // automatically bind a server - val connectionStream: SubscriberProbe[Http.IncomingConnection] = { + val connSource = { val settings = configOverrides.toOption.map(ServerSettings.apply) - val Http.ServerSource(source, key) = Http(system).bind(hostname, port, serverSettings = settings) + val binding = Http().bind(hostname, port, settings = settings) val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection] - source.to(Sink(probe)).run() + binding.connections.runWith(Sink(probe)) probe } - val connectionStreamSub = connectionStream.expectSubscription() + val connSourceSub = connSource.expectSubscription() - def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = { - val outgoingFlow = Http(system).connect(hostname, port, settings = settings) - val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]() - val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]() - val tflow = outgoingFlow.flow.asInstanceOf[Flow[((HttpRequest, T)), ((HttpResponse, T))]] - val mm = Flow(Sink(responseSubscriberProbe), Source(requestPublisherProbe)).join(tflow).run() - val connection = Await.result(mm.get(outgoingFlow.key), 3.seconds) - connection.remoteAddress.getPort shouldEqual port + def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = { + val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]() + val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]() + val connection = Http().outgoingConnection(hostname, port, settings = settings) connection.remoteAddress.getHostName shouldEqual hostname - + connection.remoteAddress.getPort shouldEqual port + Source(requestPublisherProbe).via(connection.flow).runWith(Sink(responseSubscriberProbe)) requestPublisherProbe -> responseSubscriberProbe } def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = { - connectionStreamSub.request(1) - val Http.IncomingConnection(remoteAddress, flow) = connectionStream.expectNext() + connSourceSub.request(1) + val incomingConnection = connSource.expectNext() + val sink = PublisherSink[HttpRequest]() + val source = SubscriberSource[HttpResponse]() + val mm = incomingConnection.handleWith(Flow(sink, source)) val requestSubscriberProbe = StreamTestKit.SubscriberProbe[HttpRequest]() val responsePublisherProbe = StreamTestKit.PublisherProbe[HttpResponse]() - Flow(Sink(requestSubscriberProbe), Source(responsePublisherProbe)).join(flow).run() - + mm.get(sink).subscribe(requestSubscriberProbe) + responsePublisherProbe.subscribe(mm.get(source)) requestSubscriberProbe -> responsePublisherProbe } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 4aca8607d6..055859610c 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -17,15 +17,15 @@ object TestClient extends App { akka.log-dead-letters = off """) implicit val system = ActorSystem("ServerTest", testConf) - import akka.http.TestClient.system.dispatcher + implicit val fm = FlowMaterializer() + import system.dispatcher - implicit val materializer = FlowMaterializer() val host = "spray.io" println(s"Fetching HTTP server version of host `$host` ...") - val outgoingFlow = Http(system).connect(host) - val result = Source.singleton(HttpRequest() -> 'NoContext).via(outgoingFlow.flow).map(_._1).runWith(Sink.head) + val connection = Http().outgoingConnection(host) + val result = Source.singleton(HttpRequest()).via(connection.flow).runWith(Sink.head) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index b5ea8e0d3e..645c15f47d 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -7,11 +7,9 @@ package akka.http import akka.actor.ActorSystem import akka.http.model._ import akka.stream.FlowMaterializer -import akka.stream.scaladsl.{ Flow, Sink } +import akka.stream.scaladsl.Flow import com.typesafe.config.{ ConfigFactory, Config } import HttpMethods._ -import scala.concurrent.Await -import scala.concurrent.duration._ object TestServer extends App { val testConf: Config = ConfigFactory.parseString(""" @@ -19,30 +17,21 @@ object TestServer extends App { akka.log-dead-letters = off """) implicit val system = ActorSystem("ServerTest", testConf) + implicit val fm = FlowMaterializer() - val requestHandler: HttpRequest ⇒ HttpResponse = { + val binding = Http().bind(interface = "localhost", port = 8080) + + binding startHandlingWithSyncHandler { case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") } - implicit val materializer = FlowMaterializer() - - val Http.ServerSource(source, key) = Http(system).bind(interface = "localhost", port = 8080) - val materializedMap = source.to(Sink.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - flow.join(Flow[HttpRequest].map(requestHandler)).run() - }).run() - - val serverBinding = Await.result(materializedMap.get(key), 3 seconds) - - println(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}") + println(s"Server online at http://localhost:8080") println("Press RETURN to stop...") Console.readLine() - serverBinding.close() system.shutdown() ////////////// helpers ////////////// diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala similarity index 97% rename from akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala rename to akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala index 446aa0567a..b94425def2 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala @@ -10,9 +10,7 @@ import akka.event.NoLogging import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.FlowMaterializer -import akka.stream.io.StreamTcp import akka.stream.testkit.{ AkkaSpec, StreamTestKit } -import akka.http.Http import akka.http.model._ import akka.http.util._ import headers._ @@ -20,7 +18,7 @@ import HttpEntity._ import MediaTypes._ import HttpMethods._ -class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { +class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { implicit val materializer = FlowMaterializer() "The server implementation" should { @@ -612,14 +610,18 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA } class TestSetup { - val netIn = StreamTestKit.PublisherProbe[ByteString] - val netOut = StreamTestKit.SubscriberProbe[ByteString] - val tcpConnection = StreamTcp.IncomingTcpConnection(null, Flow(Sink(netOut), Source(netIn))) + val requests = StreamTestKit.SubscriberProbe[HttpRequest] + val responses = StreamTestKit.PublisherProbe[HttpResponse] def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test"))))) - val pipeline = new HttpServerPipeline(settings, NoLogging) - val Http.IncomingConnection(_, httpPipelineFlow) = pipeline(tcpConnection) + val (netIn, netOut) = { + val netIn = StreamTestKit.PublisherProbe[ByteString] + val netOut = StreamTestKit.SubscriberProbe[ByteString] + val transportFlow = HttpServer.serverFlowToTransport(Flow(Sink(requests), Source(responses)), settings, NoLogging) + Source(netIn).via(transportFlow).runWith(Sink(netOut)) + netIn -> netOut + } def wipeDate(string: String) = string.fastSplit('\n').map { @@ -627,10 +629,6 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA case s ⇒ s }.mkString("\n") - val requests = StreamTestKit.SubscriberProbe[HttpRequest] - val responses = StreamTestKit.PublisherProbe[HttpResponse] - Flow(Sink(requests), Source(responses)).join(httpPipelineFlow).run() - val netInSub = netIn.expectSubscription() val netOutSub = netOut.expectSubscription() val requestsSub = requests.expectSubscription() @@ -647,4 +645,4 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA def closeNetworkInput(): Unit = netInSub.sendComplete() } -} +} \ No newline at end of file diff --git a/akka-http-testkit/src/test/scala/akka/http/testkit/ScalatestRouteTestSpec.scala b/akka-http-testkit/src/test/scala/akka/http/testkit/ScalatestRouteTestSpec.scala index 3f8ea8d999..4a74467932 100644 --- a/akka-http-testkit/src/test/scala/akka/http/testkit/ScalatestRouteTestSpec.scala +++ b/akka-http-testkit/src/test/scala/akka/http/testkit/ScalatestRouteTestSpec.scala @@ -15,7 +15,7 @@ import akka.http.server._ import akka.http.model._ import StatusCodes._ import HttpMethods._ -import ScalaRoutingDSL._ +import Directives._ class ScalatestRouteTestSpec extends FreeSpec with Matchers with ScalatestRouteTest { diff --git a/akka-http-tests/src/test/scala/akka/http/server/BasicRouteSpecs.scala b/akka-http-tests/src/test/scala/akka/http/server/BasicRouteSpecs.scala index 315a3be1fe..e129a502d0 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/BasicRouteSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/BasicRouteSpecs.scala @@ -7,7 +7,6 @@ package akka.http.server import akka.http.model import model.HttpMethods._ import model.StatusCodes -import akka.http.server.PathMatchers.{ Segment, IntNumber } class BasicRouteSpecs extends RoutingSpec { @@ -40,7 +39,6 @@ class BasicRouteSpecs extends RoutingSpec { val stringDirective = provide("The cat") val intDirective = provide(42) val doubleDirective = provide(23.0) - val symbolDirective = provide('abc) val dirStringInt = stringDirective & intDirective val dirStringIntDouble = dirStringInt & doubleDirective @@ -78,7 +76,7 @@ class BasicRouteSpecs extends RoutingSpec { } "Route disjunction" should { "work in the happy case" in { - val route = sealRoute((path("abc") | path("def")) { + val route = Route.seal((path("abc") | path("def")) { completeOk }) @@ -138,7 +136,7 @@ class BasicRouteSpecs extends RoutingSpec { case object MyException extends RuntimeException "Route sealing" should { "catch route execution exceptions" in { - Get("/abc") ~> ScalaRoutingDSL.sealRoute { + Get("/abc") ~> Route.seal { get { ctx ⇒ throw MyException } @@ -147,7 +145,7 @@ class BasicRouteSpecs extends RoutingSpec { } } "catch route building exceptions" in { - Get("/abc") ~> ScalaRoutingDSL.sealRoute { + Get("/abc") ~> Route.seal { get { throw MyException } @@ -157,7 +155,7 @@ class BasicRouteSpecs extends RoutingSpec { } "convert all rejections to responses" in { object MyRejection extends Rejection - Get("/abc") ~> ScalaRoutingDSL.sealRoute { + Get("/abc") ~> Route.seal { get { reject(MyRejection) } diff --git a/akka-http-tests/src/test/scala/akka/http/server/RoutingSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/RoutingSpec.scala index 616b19366b..c97ca9a281 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/RoutingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/RoutingSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.{ WordSpec, Suite, Matchers } import akka.http.model.HttpResponse import akka.http.testkit.ScalatestRouteTest -trait GenericRoutingSpec extends Matchers with ScalaRoutingDSL with ScalatestRouteTest { this: Suite ⇒ +trait GenericRoutingSpec extends Matchers with Directives with ScalatestRouteTest { this: Suite ⇒ val Ok = HttpResponse() val completeOk = complete(Ok) diff --git a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala index 2ff749c93c..5d0c625962 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala @@ -7,26 +7,20 @@ package akka.http.server import akka.http.marshallers.xml.ScalaXmlSupport import akka.http.server.directives.AuthenticationDirectives._ import com.typesafe.config.{ ConfigFactory, Config } -import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.stream.FlowMaterializer -import akka.util.Timeout import akka.http.Http -import akka.http.model._ object TestServer extends App { val testConf: Config = ConfigFactory.parseString(""" akka.loglevel = INFO - akka.log-dead-letters = off - """) + akka.log-dead-letters = off""") implicit val system = ActorSystem("ServerTest", testConf) import system.dispatcher implicit val materializer = FlowMaterializer() - implicit val askTimeout: Timeout = 500.millis - val serverSource = Http(system).bind(interface = "localhost", port = 8080) - - import ScalaRoutingDSL._ + import ScalaXmlSupport._ + import Directives._ def auth = HttpBasicAuthenticator.provideUserName { @@ -34,11 +28,9 @@ object TestServer extends App { case _ ⇒ false } - // FIXME: a simple `import ScalaXmlSupport._` should suffice but currently doesn't because - // of #16190 - implicit val html = ScalaXmlSupport.nodeSeqMarshaller(MediaTypes.`text/html`) + val binding = Http().bind(interface = "localhost", port = 8080) - handleConnections(serverSource) withRoute { + val materializedMap = binding startHandlingWith { get { path("") { complete(index) @@ -58,9 +50,8 @@ object TestServer extends App { } println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") - Console.readLine() - system.shutdown() + binding.unbind(materializedMap).onComplete(_ ⇒ system.shutdown()) lazy val index = diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/AuthenticationDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/AuthenticationDirectivesSpec.scala index b29cb3c6b2..32953548af 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/AuthenticationDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/AuthenticationDirectivesSpec.scala @@ -31,7 +31,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec { } ~> check { rejection shouldEqual AuthenticationFailedRejection(CredentialsRejected, challenge) } } "reject requests with illegal Authorization header with 401" in { - Get() ~> RawHeader("Authorization", "bob alice") ~> sealRoute { + Get() ~> RawHeader("Authorization", "bob alice") ~> Route.seal { dontAuth { echoComplete } } ~> check { status shouldEqual StatusCodes.Unauthorized @@ -52,7 +52,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec { "properly handle exceptions thrown in its inner route" in { object TestException extends RuntimeException Get() ~> Authorization(BasicHttpCredentials("Alice", "")) ~> { - sealRoute { + Route.seal { doAuth { _ ⇒ throw TestException } } } ~> check { status shouldEqual StatusCodes.InternalServerError } @@ -66,7 +66,7 @@ class AuthenticationDirectivesSpec extends RoutingSpec { } val bothAuth = dontAuth | otherAuth - Get() ~> sealRoute { + Get() ~> Route.seal { bothAuth { echoComplete } } ~> check { status shouldEqual StatusCodes.Unauthorized diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala index 643fb23fd0..40d9610b8c 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala @@ -71,7 +71,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { } "be transparent to non-200 responses" in { - Get() ~> addHeader(Range(ByteRange(1, 2))) ~> sealRoute(wrs(reject())) ~> check { + Get() ~> addHeader(Range(ByteRange(1, 2))) ~> Route.seal(wrs(reject())) ~> check { status == NotFound headers.exists { case `Content-Range`(_, _) ⇒ true; case _ ⇒ false } shouldEqual false } diff --git a/akka-http/src/main/scala/akka/http/server/Route.scala b/akka-http/src/main/scala/akka/http/server/Route.scala new file mode 100644 index 0000000000..50501656bc --- /dev/null +++ b/akka-http/src/main/scala/akka/http/server/Route.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.server + +import scala.concurrent.Future +import akka.stream.scaladsl.Flow +import akka.http.model.{ HttpRequest, HttpResponse } +import akka.http.util.FastFuture._ + +object Route { + + /** + * Helper for constructing a Route from a function literal. + */ + def apply(f: Route): Route = f + + /** + * "Seals" a route by wrapping it with exception handling and rejection conversion. + */ + def seal(route: Route)(implicit setup: RoutingSetup): Route = { + import directives.ExecutionDirectives._ + import setup._ + val sealedExceptionHandler = + if (exceptionHandler.isDefault) exceptionHandler + else exceptionHandler orElse ExceptionHandler.default(settings) + val sealedRejectionHandler = + if (rejectionHandler.isDefault) rejectionHandler + else rejectionHandler orElse RejectionHandler.default + handleExceptions(sealedExceptionHandler) { + handleRejections(sealedRejectionHandler) { + route + } + } + } + + /** + * Turns a `Route` into an server flow. + */ + def handlerFlow(route: Route)(implicit setup: RoutingSetup): Flow[HttpRequest, HttpResponse] = + Flow[HttpRequest].mapAsync(asyncHandler(route)) + + /** + * Turns a `Route` into an async handler function. + */ + def asyncHandler(route: Route)(implicit setup: RoutingSetup): HttpRequest ⇒ Future[HttpResponse] = { + import setup._ + val sealedRoute = seal(route) + request ⇒ + sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request), setup.settings)).fast.map { + case RouteResult.Complete(response) ⇒ response + case RouteResult.Rejected(rejected) ⇒ throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!") + } + } +} \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/server/RouteResult.scala b/akka-http/src/main/scala/akka/http/server/RouteResult.scala index bf12eb0563..d71cc93780 100644 --- a/akka-http/src/main/scala/akka/http/server/RouteResult.scala +++ b/akka-http/src/main/scala/akka/http/server/RouteResult.scala @@ -5,8 +5,8 @@ package akka.http.server import scala.collection.immutable - -import akka.http.model.HttpResponse +import akka.stream.scaladsl.Flow +import akka.http.model.{ HttpRequest, HttpResponse } /** * The result of handling a request. @@ -19,4 +19,7 @@ sealed trait RouteResult object RouteResult { final case class Complete(response: HttpResponse) extends RouteResult final case class Rejected(rejections: immutable.Seq[Rejection]) extends RouteResult + + implicit def route2HandlerFlow(route: Route)(implicit setup: RoutingSetup): Flow[HttpRequest, HttpResponse] = + Route.handlerFlow(route) } diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 3e61358f4e..5d4f399272 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -4,13 +4,12 @@ package akka.http.server -import akka.stream.FlowMaterializer - import scala.concurrent.ExecutionContext -import akka.actor.{ ActorSystem, ActorContext } import akka.event.LoggingAdapter -import akka.http.model.HttpRequest +import akka.actor.{ ActorSystem, ActorContext } +import akka.stream.FlowMaterializer import akka.http.Http +import akka.http.model.HttpRequest /** * Provides a ``RoutingSetup`` for a given connection. diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala deleted file mode 100644 index ebfdb5555a..0000000000 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.server - -import scala.concurrent.{ ExecutionContext, Future } -import akka.stream.scaladsl._ -import akka.stream.FlowMaterializer -import akka.http.util.FastFuture -import akka.http.model.{ HttpRequest, HttpResponse } -import akka.http.Http -import FastFuture._ - -/** - * The main entry point into the Scala routing DSL. - * - * `import ScalaRoutingDSL._` to bring everything required into scope. - */ -trait ScalaRoutingDSL extends Directives { - - sealed trait Applicator[R] { - def withRoute(route: Route): R - def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): R - def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): R - } - - def handleConnections(serverSource: Http.ServerSource)(implicit fm: FlowMaterializer, - setupProvider: RoutingSetupProvider): Applicator[Unit] = { - new Applicator[Unit] { - def withRoute(route: Route): Unit = - run(routeRunner(route, _)) - - def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): Unit = - withAsyncHandler(request ⇒ FastFuture.successful(handler(request))) - - def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): Unit = - run(_ ⇒ handler) - - private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): Unit = - serverSource.source.foreach { - case connection @ Http.IncomingConnection(remoteAddress, flow) ⇒ - val setup = setupProvider(connection) - setup.routingLog.log.debug("Accepted new connection from " + remoteAddress) - val runner = f(setup) - flow.join(Flow[HttpRequest].mapAsync(request ⇒ runner(request))).run()(fm) - } - } - } - - def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Future[HttpResponse] = { - import setup._ - val sealedRoute = sealRoute(route)(setup) - request ⇒ - sealedRoute(new RequestContextImpl(request, routingLog.requestLog(request), setup.settings)).fast.map { - case RouteResult.Complete(response) ⇒ response - case RouteResult.Rejected(rejected) ⇒ throw new IllegalStateException(s"Unhandled rejections '$rejected', unsealed RejectionHandler?!") - } - } - - /** - * "Seals" a route by wrapping it with exception handling and rejection conversion. - */ - def sealRoute(route: Route)(implicit setup: RoutingSetup): Route = { - import setup._ - val sealedExceptionHandler = - if (exceptionHandler.isDefault) exceptionHandler - else exceptionHandler orElse ExceptionHandler.default(settings) - val sealedRejectionHandler = - if (rejectionHandler.isDefault) rejectionHandler - else rejectionHandler orElse RejectionHandler.default - handleExceptions(sealedExceptionHandler) { - handleRejections(sealedRejectionHandler) { - route - } - } - } -} - -object ScalaRoutingDSL extends ScalaRoutingDSL \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/server/package.scala b/akka-http/src/main/scala/akka/http/server/package.scala index ab19fd1b0c..ffcd0cbb74 100644 --- a/akka-http/src/main/scala/akka/http/server/package.scala +++ b/akka-http/src/main/scala/akka/http/server/package.scala @@ -15,10 +15,5 @@ package object server { type PathMatcher0 = PathMatcher[Unit] type PathMatcher1[T] = PathMatcher[Tuple1[T]] - /** - * Helper for constructing a Route from a function literal. - */ - def Route(f: Route): Route = f - def FIXME = throw new RuntimeException("Not yet implemented") } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala index d798e9ea41..a21c12ab9b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -3,14 +3,11 @@ */ package akka.stream.io -import akka.stream.io.StreamTcp.{ TcpServerBinding, IncomingTcpConnection } +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.util.ByteString import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec -import akka.util.ByteString -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.{ Future, Await } -import scala.concurrent.duration._ import akka.stream.scaladsl._ class TcpFlowSpec extends AkkaSpec with TcpHelper { @@ -26,7 +23,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpReadProbe = new TcpReadProbe() val tcpWriteProbe = new TcpWriteProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() validateServerClientCommunication(testData, serverConnection, tcpReadProbe, tcpWriteProbe) @@ -42,7 +39,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).via(StreamTcp(system).connect(server.address).flow).to(Sink.ignore).run() + Source(testInput).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink.ignore).run() val serverConnection = server.waitAccept() serverConnection.read(256) @@ -57,7 +54,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val idle = new TcpWriteProbe() // Just register an idle upstream val resultFuture = Source(idle.publisherProbe) - .via(StreamTcp(system).connect(server.address).flow) + .via(StreamTcp().outgoingConnection(server.address).flow) .fold(ByteString.empty)((acc, in) ⇒ acc ++ in) val serverConnection = server.waitAccept() @@ -76,7 +73,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() tcpWriteProbe.close() @@ -93,7 +90,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() tcpReadProbe.close() @@ -113,7 +110,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // FIXME: here (and above tests) add a chitChat() method ensuring this works even after prior communication @@ -138,7 +135,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp(system).connect(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address).flow).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() serverConnection.abort() @@ -159,22 +156,20 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe1 = new TcpWriteProbe() val tcpReadProbe2 = new TcpReadProbe() val tcpWriteProbe2 = new TcpWriteProbe() - val outgoingFlow = StreamTcp(system).connect(server.address) + val outgoingConnection = StreamTcp().outgoingConnection(server.address) - val mm1 = Source(tcpWriteProbe1.publisherProbe).via(outgoingFlow.flow).to(Sink(tcpReadProbe1.subscriberProbe)).run() + val mm1 = Source(tcpWriteProbe1.publisherProbe).via(outgoingConnection.flow).to(Sink(tcpReadProbe1.subscriberProbe)).run() val serverConnection1 = server.waitAccept() - val mm2 = Source(tcpWriteProbe2.publisherProbe).via(outgoingFlow.flow).to(Sink(tcpReadProbe2.subscriberProbe)).run() + val mm2 = Source(tcpWriteProbe2.publisherProbe).via(outgoingConnection.flow).to(Sink(tcpReadProbe2.subscriberProbe)).run() val serverConnection2 = server.waitAccept() validateServerClientCommunication(testData, serverConnection1, tcpReadProbe1, tcpWriteProbe1) validateServerClientCommunication(testData, serverConnection2, tcpReadProbe2, tcpWriteProbe2) // Since we have already communicated over the connections we can have short timeouts for the futures - val outgoingConnection1 = Await.result(mm1.get(outgoingFlow.key), 100 millis) - val outgoingConnection2 = Await.result(mm2.get(outgoingFlow.key), 100 millis) - - outgoingConnection1.remoteAddress.getPort should be(server.address.getPort) - outgoingConnection2.remoteAddress.getPort should be(server.address.getPort) - outgoingConnection1.localAddress.getPort should not be (outgoingConnection2.localAddress.getPort) + outgoingConnection.remoteAddress.getPort should be(server.address.getPort) + val localAddress1 = Await.result(outgoingConnection.localAddress(mm1), 100.millis) + val localAddress2 = Await.result(outgoingConnection.localAddress(mm2), 100.millis) + localAddress1.getPort should not be localAddress2.getPort tcpWriteProbe1.close() tcpReadProbe1.close() @@ -187,47 +182,39 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { "TCP listen stream" must { // Reusing handler - val echoHandler = ForeachSink[IncomingTcpConnection] { incoming ⇒ - incoming.stream.join(Flow.empty).run() - } + val echoHandler = ForeachSink[StreamTcp.IncomingConnection] { _ handleWith Flow[ByteString] } "be able to implement echo" in { - import system.dispatcher - val serverAddress = temporaryServerAddress - val binding = StreamTcp(system).bind(serverAddress) - val echoServer = binding.to(echoHandler).run() + val binding = StreamTcp().bind(serverAddress) + val echoServerMM = binding.connections.to(echoHandler).run() - val echoServerFinish = echoServer.get(echoHandler) - val echoServerBinding = echoServer.get(binding) + val echoServerFinish = echoServerMM.get(echoHandler) // make sure that the server has bound to the socket - Await.result(echoServerBinding, 3.seconds) + Await.result(binding.localAddress(echoServerMM), 3.seconds) val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val resultFuture = - Source(testInput).via(StreamTcp(system).connect(serverAddress).flow).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) + Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 3.seconds) should be(expectedOutput) - echoServerBinding.foreach(_.close) - Await.result(echoServerFinish, 3.seconds) + Await.result(binding.unbind(echoServerMM), 3.seconds) + Await.result(echoServerFinish, 1.second) } "work with a chain of echoes" in { - import system.dispatcher - val serverAddress = temporaryServerAddress val binding = StreamTcp(system).bind(serverAddress) - val echoServer = binding.to(echoHandler).run() + val echoServerMM = binding.connections.to(echoHandler).run() - val echoServerFinish = echoServer.get(echoHandler) - val echoServerBinding = echoServer.get(binding) + val echoServerFinish = echoServerMM.get(echoHandler) // make sure that the server has bound to the socket - Await.result(echoServerBinding, 3.seconds) + Await.result(binding.localAddress(echoServerMM), 3.seconds) - val echoConnection = StreamTcp(system).connect(serverAddress).flow + val echoConnection = StreamTcp().outgoingConnection(serverAddress).flow val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) @@ -240,9 +227,9 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { .via(echoConnection) .fold(ByteString.empty)((acc, in) ⇒ acc ++ in) - Await.result(resultFuture, 3.seconds) should be(expectedOutput) - echoServerBinding.foreach(_.close) - Await.result(echoServerFinish, 3.seconds) + Await.result(resultFuture, 5.seconds) should be(expectedOutput) + Await.result(binding.unbind(echoServerMM), 3.seconds) + Await.result(echoServerFinish, 1.second) } } diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala index 6302d37e32..3140d60e4f 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala @@ -3,68 +3,203 @@ */ package akka.stream.io -import akka.actor._ -import akka.io.Inet.SocketOption -import akka.io.Tcp -import akka.pattern.ask -import akka.stream.impl._ -import akka.stream.MaterializerSettings -import akka.stream.scaladsl._ -import akka.util.{ ByteString, Timeout } -import java.io.Closeable import java.net.{ InetSocketAddress, URLEncoder } -import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } -import scala.collection._ -import scala.concurrent.duration._ -import scala.concurrent.{ Promise, ExecutionContext, Future } +import org.reactivestreams.{ Processor, Subscriber, Subscription } import scala.util.control.NoStackTrace import scala.util.{ Failure, Success } +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ Promise, ExecutionContext, Future } +import akka.util.ByteString +import akka.io.Inet.SocketOption +import akka.io.Tcp +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.scaladsl._ +import akka.stream.impl._ +import akka.actor._ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { - override def lookup = StreamTcp - override def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system) - override def get(system: ActorSystem): StreamTcpExt = super.get(system) - /** - * The materialized result of an outgoing TCP connection stream. + * Represents a prospective TCP server binding. */ - case class OutgoingTcpConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) + trait ServerBinding { + /** + * The local address of the endpoint bound by the materialization of the `connections` [[Source]] + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress] - /** - * A flow representing an outgoing TCP connection, and the key used to get information about the materialized connection. - */ - case class OutgoingTcpFlow(flow: Flow[ByteString, ByteString], key: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) + /** + * The stream of accepted incoming connections. + * Can be materialized several times but only one subscription can be "live" at one time, i.e. + * subsequent materializations will reject subscriptions with an [[BindFailedException]] if the previous + * materialization still has an uncancelled subscription. + * Cancelling the subscription to a materialization of this source will cause the listening port to be unbound. + */ + def connections: Source[IncomingConnection] - /** - * The materialized result of a bound server socket. - */ - abstract sealed case class TcpServerBinding(localAddress: InetSocketAddress) extends Closeable - - /** - * INTERNAL API - */ - private[akka] object TcpServerBinding { - def apply(localAddress: InetSocketAddress): TcpServerBinding = - new TcpServerBinding(localAddress) { - override def close() = () - } - - def apply(localAddress: InetSocketAddress, closeable: Closeable): TcpServerBinding = - new TcpServerBinding(localAddress) { - override def close() = closeable.close() - } + /** + * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections` + * [[Source]] whose [[MaterializedMap]] is passed as parameter. + * + * The produced [[Future]] is fulfilled when the unbinding has been completed. + */ + def unbind(materializedMap: MaterializedMap): Future[Unit] } /** - * An incoming TCP connection. + * Represents an accepted incoming TCP connection. */ - case class IncomingTcpConnection(remoteAddress: InetSocketAddress, stream: Flow[ByteString, ByteString]) + trait IncomingConnection { + /** + * The local address this connection is bound to. + */ + def localAddress: InetSocketAddress + + /** + * The remote address this connection is bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * Handles the connection using the given flow, which is materialized exactly once and the respective + * [[MaterializedMap]] returned. + * + * Convenience shortcut for: `flow.join(handler).run()`. + */ + def handleWith(handler: Flow[ByteString, ByteString])(implicit materializer: FlowMaterializer): MaterializedMap + + /** + * A flow representing the client on the other side of the connection. + * This flow can be materialized only once. + */ + def flow: Flow[ByteString, ByteString] + } /** - * The exception thrown on bind or accept failures. + * Represents a prospective outgoing TCP connection. */ - class IncomingTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace + sealed trait OutgoingConnection { + /** + * The remote address this connection is or will be bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * The local address of the endpoint bound by the materialization of the connection materialization + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(mMap: MaterializedMap): Future[InetSocketAddress] + + /** + * Handles the connection using the given flow. + * This method can be called several times, every call will materialize the given flow exactly once thereby + * triggering a new connection attempt to the `remoteAddress`. + * If the connection cannot be established the materialized stream will immediately be terminated + * with a [[ConnectionAttemptFailedException]]. + * + * Convenience shortcut for: `flow.join(handler).run()`. + */ + def handleWith(handler: Flow[ByteString, ByteString])(implicit materializer: FlowMaterializer): MaterializedMap + + /** + * A flow representing the server on the other side of the connection. + * This flow can be materialized several times, every materialization will open a new connection to the + * `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated + * with a [[ConnectionAttemptFailedException]]. + */ + def flow: Flow[ByteString, ByteString] + } + + case object BindFailedException extends RuntimeException with NoStackTrace + + class ConnectionException(message: String) extends RuntimeException(message) + + class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed") + + //////////////////// EXTENSION SETUP /////////////////// + + def apply()(implicit system: ActorSystem): StreamTcpExt = super.apply(system) + + def lookup() = StreamTcp + + def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system) +} + +class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension { + import StreamTcpExt._ + import StreamTcp._ + import system.dispatcher + + private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM") + + /** + * Creates a [[ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + */ + def bind(endpoint: InetSocketAddress, + backlog: Int = 100, + options: immutable.Traversable[SocketOption] = Nil, + idleTimeout: Duration = Duration.Inf): ServerBinding = { + val connectionSource = new KeyedActorFlowSource[IncomingConnection] { + override type MaterializedType = (Future[InetSocketAddress], Future[() ⇒ Future[Unit]]) + override def attach(flowSubscriber: Subscriber[IncomingConnection], + materializer: ActorBasedFlowMaterializer, + flowName: String): MaterializedType = { + val localAddressPromise = Promise[InetSocketAddress]() + val unbindPromise = Promise[() ⇒ Future[Unit]]() + manager ! StreamTcpManager.Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, + idleTimeout) + localAddressPromise.future -> unbindPromise.future + } + } + new ServerBinding { + def localAddress(mm: MaterializedMap) = mm.get(connectionSource)._1 + def connections = connectionSource + def unbind(mm: MaterializedMap): Future[Unit] = mm.get(connectionSource)._2.flatMap(_()) + } + } + + /** + * Creates an [[OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + */ + def outgoingConnection(remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[SocketOption] = Nil, + connectTimeout: Duration = Duration.Inf, + idleTimeout: Duration = Duration.Inf): OutgoingConnection = { + val remoteAddr = remoteAddress + val key = new PreMaterializedOutgoingKey() + val stream = Pipe(key) { () ⇒ + val processorPromise = Promise[Processor[ByteString, ByteString]]() + val localAddressPromise = Promise[InetSocketAddress]() + manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, + connectTimeout, idleTimeout) + (new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), localAddressPromise.future) + } + new OutgoingConnection { + def remoteAddress = remoteAddr + def localAddress(mm: MaterializedMap) = mm.get(key) + def flow = stream + def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) = + flow.join(handler).run() + } + } +} + +/** + * INTERNAL API + */ +private[akka] object StreamTcpExt { + /** + * INTERNAL API + */ + class PreMaterializedOutgoingKey extends Key { + type MaterializedType = Future[InetSocketAddress] + + override def materialize(map: MaterializedMap) = + throw new IllegalStateException("This key has already been materialized by the TCP Processor") + } } /** @@ -75,8 +210,8 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[ private val setVarFuture = implFuture.andThen { case Success(p) ⇒ impl = p } override def onSubscribe(s: Subscription): Unit = implFuture.onComplete { - case Success(impl) ⇒ impl.onSubscribe(s) - case Failure(_) ⇒ s.cancel() + case Success(x) ⇒ x.onSubscribe(s) + case Failure(_) ⇒ s.cancel() } override def onError(t: Throwable): Unit = { @@ -92,106 +227,11 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[ override def onNext(t: I): Unit = impl.onNext(t) override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete { - case Success(impl) ⇒ impl.subscribe(s) - case Failure(e) ⇒ s.onError(e) + case Success(x) ⇒ x.subscribe(s) + case Failure(e) ⇒ s.onError(e) } } -/** - * INTERNAL API - */ -private[akka] object StreamTcpExt { - /** - * INTERNAL API - */ - class PreMaterializedOutgoingKey extends Key { - type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] - - override def materialize(map: MaterializedMap) = - throw new IllegalArgumentException("This key have already been materialized by the TCP Processor") - } -} - -class StreamTcpExt(val system: ExtendedActorSystem) extends Extension { - import StreamTcpExt._ - import StreamTcp._ - - private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM") - - /** - * Creates a Flow that represents a TCP connection to a remote host. The actual connection is only attempted - * when the Flow is materialized. The returned Flow is reusable, each new materialization will attempt to open - * a new connection to the remote host. - * - * @param remoteAddress the address to connect to - * @param localAddress optionally specifies a specific address to bind to - * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. - * @param connectTimeout the desired timeout for connection establishment, infinite means "no timeout" - * @param idleTimeout the desired idle timeout on the connection, infinite means "no timeout" - * - */ - def connect(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[SocketOption] = Nil, - connectTimeout: Duration = Duration.Inf, - idleTimeout: Duration = Duration.Inf): OutgoingTcpFlow = { - implicit val t = Timeout(3.seconds) - import system.dispatcher - - val key = new PreMaterializedOutgoingKey() - - val pipe = Pipe(key) { () ⇒ - { - val promise = Promise[OutgoingTcpConnection] - val future = (StreamTcp(system).manager ? StreamTcpManager.Connect(remoteAddress, localAddress, None, options, connectTimeout, idleTimeout)) - .mapTo[StreamTcpManager.ConnectReply] - future.map(r ⇒ OutgoingTcpConnection(r.remoteAddress, r.localAddress)).onComplete(promise.complete(_)) - (new DelayedInitProcessor[ByteString, ByteString](future.map(_.processor)), promise.future) - } - } - - StreamTcp.OutgoingTcpFlow(pipe, key) - } - - /** - * Returns a Source that represents a port listening to incoming connections. The actual binding to the local port - * happens when the Source is first materialized. This Source is not reusable until the listen port becomes available - * again. - * - * @param localAddress the socket address to bind to; use port zero for automatic assignment (i.e. an ephemeral port) - * @param backlog the number of unaccepted connections the O/S - * kernel will hold for this port before refusing connections. - * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. - * @param idleTimeout the desired idle timeout on the accepted connections, infinite means "no timeout" - */ - def bind(localAddress: InetSocketAddress, - backlog: Int = 100, - options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf): KeyedSource[IncomingTcpConnection] { type MaterializedType = Future[TcpServerBinding] } = { - new KeyedActorFlowSource[IncomingTcpConnection] { - implicit val t = Timeout(3.seconds) - import system.dispatcher - - override def attach(flowSubscriber: Subscriber[IncomingTcpConnection], - materializer: ActorBasedFlowMaterializer, - flowName: String): MaterializedType = { - val bindingFuture = (StreamTcp(system).manager ? StreamTcpManager.Bind(localAddress, None, backlog, options, idleTimeout)) - .mapTo[StreamTcpManager.BindReply] - - bindingFuture.map(_.connectionStream).onComplete { - case Success(impl) ⇒ impl.subscribe(flowSubscriber) - case Failure(e) ⇒ flowSubscriber.onError(e) - } - - bindingFuture.map { bf ⇒ TcpServerBinding(bf.localAddress, bf.closeable) } - } - - override type MaterializedType = Future[TcpServerBinding] - } - } - -} - /** * INTERNAL API */ @@ -199,34 +239,24 @@ private[io] object StreamTcpManager { /** * INTERNAL API */ - private[io] case class ConnectReply(remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress, - processor: Processor[ByteString, ByteString]) + private[io] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]], + localAddressPromise: Promise[InetSocketAddress], + remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress], + options: immutable.Traversable[SocketOption], + connectTimeout: Duration, + idleTimeout: Duration) /** * INTERNAL API */ - private[io] case class Connect(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress] = None, - materializerSettings: Option[MaterializerSettings] = None, - options: immutable.Traversable[SocketOption] = Nil, - connectTimeout: Duration = Duration.Inf, - idleTimeout: Duration = Duration.Inf) - /** - * INTERNAL API - */ - private[io] case class Bind(localAddress: InetSocketAddress, - settings: Option[MaterializerSettings] = None, - backlog: Int = 100, - options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf) - - /** - * INTERNAL API - */ - private[io] case class BindReply(localAddress: InetSocketAddress, - connectionStream: Publisher[StreamTcp.IncomingTcpConnection], - closeable: Closeable) + private[io] case class Bind(localAddressPromise: Promise[InetSocketAddress], + unbindPromise: Promise[() ⇒ Future[Unit]], + flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + endpoint: InetSocketAddress, + backlog: Int, + options: immutable.Traversable[SocketOption], + idleTimeout: Duration) /** * INTERNAL API @@ -242,32 +272,26 @@ private[akka] class StreamTcpManager extends Actor { import akka.stream.io.StreamTcpManager._ var nameCounter = 0 - def encName(prefix: String, address: InetSocketAddress) = { + def encName(prefix: String, endpoint: InetSocketAddress) = { nameCounter += 1 - s"$prefix-$nameCounter-${URLEncoder.encode(address.toString, "utf-8")}" + s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}" } def receive: Receive = { - case Connect(remoteAddress, localAddress, maybeMaterializerSettings, options, connectTimeout, idleTimeout) ⇒ + case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _) ⇒ val connTimeout = connectTimeout match { case x: FiniteDuration ⇒ Some(x) case _ ⇒ None } - val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system) - - val processorActor = context.actorOf(TcpStreamActor.outboundProps( + val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), - requester = sender(), - settings = materializerSettings), name = encName("client", remoteAddress)) + materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) - case Bind(localAddress, maybeMaterializerSettings, backlog, options, idleTimeout) ⇒ - val materializerSettings = maybeMaterializerSettings getOrElse MaterializerSettings(context.system) - - val publisherActor = context.actorOf(TcpListenStreamActor.props( - Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true), - requester = sender(), - materializerSettings), name = encName("server", localAddress)) + case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _) ⇒ + val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise, + flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true), + MaterializerSettings(context.system)), name = encName("server", endpoint)) // this sends the ExposedPublisher message to the publisher actor automatically ActorPublisher[Any](publisherActor) } diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index f8d04cbf5e..1aaff85999 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -3,7 +3,10 @@ */ package akka.stream.io +import java.net.InetSocketAddress + import akka.io.{ IO, Tcp } +import scala.concurrent.Promise import scala.util.control.NoStackTrace import akka.actor.{ ActorRefFactory, Actor, Props, ActorRef, Status } import akka.stream.impl._ @@ -20,8 +23,13 @@ private[akka] object TcpStreamActor { case object WriteAck extends Tcp.Event class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace - def outboundProps(connectCmd: Connect, requester: ActorRef, settings: MaterializerSettings): Props = - Props(new OutboundTcpStreamActor(connectCmd, requester, settings)).withDispatcher(settings.dispatcher) + def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]], + localAddressPromise: Promise[InetSocketAddress], + connectCmd: Connect, + materializerSettings: MaterializerSettings): Props = + Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, connectCmd, + materializerSettings)).withDispatcher(materializerSettings.dispatcher) + def inboundProps(connection: ActorRef, settings: MaterializerSettings): Props = Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher) } @@ -201,7 +209,9 @@ private[akka] class InboundTcpStreamActor( /** * INTERNAL API */ -private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requester: ActorRef, _settings: MaterializerSettings) +private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[ByteString, ByteString]], + localAddressPromise: Promise[InetSocketAddress], + val connectCmd: Connect, _settings: MaterializerSettings) extends TcpStreamActor(_settings) { import TcpStreamActor._ import context.system @@ -222,11 +232,14 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) tcpOutputs.setConnection(connection) tcpInputs.setConnection(connection) - requester ! StreamTcpManager.ConnectReply(remoteAddress, localAddress, exposedProcessor) + localAddressPromise.success(localAddress) + processorPromise.success(exposedProcessor) initSteps.become(Actor.emptyBehavior) + case f: CommandFailed ⇒ val ex = new TcpStreamException("Connection failed.") - requester ! Status.Failure(ex) + localAddressPromise.failure(ex) + processorPromise.failure(ex) fail(ex) } } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index 5a5f6c8dee..41111c1642 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -3,35 +3,38 @@ */ package akka.stream.io -import java.io.Closeable - -import akka.actor._ +import java.net.InetSocketAddress +import akka.stream.io.StreamTcp.ConnectionException +import org.reactivestreams.Subscriber +import scala.concurrent.{ Future, Promise } +import akka.util.ByteString import akka.io.Tcp._ import akka.io.{ IO, Tcp } -import akka.stream.MaterializerSettings +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.scaladsl.{ Flow, Pipe } import akka.stream.impl._ -import akka.stream.scaladsl.{ Pipe, Flow } -import akka.util.ByteString -import org.reactivestreams.{ Processor, Publisher } - -import scala.util.control.NoStackTrace +import akka.actor._ /** * INTERNAL API */ private[akka] object TcpListenStreamActor { - def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props = { - Props(new TcpListenStreamActor(bindCmd, requester, settings)) + def props(localAddressPromise: Promise[InetSocketAddress], + unbindPromise: Promise[() ⇒ Future[Unit]], + flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + bindCmd: Tcp.Bind, materializerSettings: MaterializerSettings): Props = { + Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, bindCmd, materializerSettings)) } - } /** * INTERNAL API */ -private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings) extends Actor +private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocketAddress], + unbindPromise: Promise[() ⇒ Future[Unit]], + flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + bindCmd: Tcp.Bind, settings: MaterializerSettings) extends Actor with Pump with Stash { - import akka.stream.io.TcpListenStreamActor._ import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -48,7 +51,9 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, def getExposedPublisher = exposedPublisher } + private val unboundPromise = Promise[Unit]() private var finished = false + override protected def pumpFinished(): Unit = { if (!finished) { finished = true @@ -71,16 +76,15 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, nextPhase(runningPhase) listener ! ResumeAccepting(1) val target = self - requester ! StreamTcpManager.BindReply( - localAddress, - primaryOutputs.getExposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]], - new Closeable { - override def close() = target ! Unbind - }) + localAddressPromise.success(localAddress) + unbindPromise.success(() ⇒ { target ! Unbind; unboundPromise.future }) + primaryOutputs.getExposedPublisher.subscribe(flowSubscriber.asInstanceOf[Subscriber[Any]]) subreceive.become(running) case f: CommandFailed ⇒ - val ex = new StreamTcp.IncomingTcpException("Bind failed") - requester ! Status.Failure(ex) + val ex = StreamTcp.BindFailedException + localAddressPromise.failure(ex) + unbindPromise.failure(ex) + flowSubscriber.onError(ex) fail(ex) } @@ -89,12 +93,16 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, pendingConnection = (c, sender()) pump() case f: CommandFailed ⇒ - fail(new StreamTcp.IncomingTcpException(s"Command [${f.cmd}] failed")) + val ex = new ConnectionException(s"Command [${f.cmd}] failed") + unbindPromise.tryFailure(ex) + fail(ex) case Unbind ⇒ - cancel() + if (!closed && listener != null) listener ! Unbind + listener = null pump() case Unbound ⇒ // If we're unbound then just shut down - closed = true + cancel() + unboundPromise.trySuccess(()) pump() } @@ -114,7 +122,6 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, listener ! ResumeAccepting(1) elem } - } final override def receive = { @@ -132,7 +139,19 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, settings)) val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor) - primaryOutputs.enqueueOutputElement(StreamTcp.IncomingTcpConnection(connected.remoteAddress, Pipe(() ⇒ processor))) + val conn = new StreamTcp.IncomingConnection { + val flow = Pipe(() ⇒ processor) + def localAddress = connected.localAddress + def remoteAddress = connected.remoteAddress + def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) = + flow.join(handler).run() + } + primaryOutputs.enqueueOutputElement(conn) + } + + override def postStop(): Unit = { + unboundPromise.trySuccess(()) + super.postStop() } def fail(e: Throwable): Unit = {