diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index c147a2aedd..ae78d87341 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -58,7 +58,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443 val tlsStage = sslTlsStage(httpsContext, Server) val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = - Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, settings.timeouts.idleTimeout) + Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout) connections.map { case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, Some(remoteAddress), log) @@ -189,7 +189,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E val layer = clientLayer(hostHeader, settings, log) val tlsStage = sslTlsStage(httpsContext, Client) val transportFlow = Tcp().outgoingConnection(new InetSocketAddress(host, port), localAddress, - settings.socketOptions, settings.connectingTimeout, settings.idleTimeout) + settings.socketOptions, halfClose = true, settings.connectingTimeout, settings.idleTimeout) layer.atop(tlsStage).joinMat(transportFlow) { (_, tcpConnFuture) ⇒ import system.dispatcher diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 4011f5f73c..055dd4e71d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -5,6 +5,7 @@ package akka.stream.io import akka.stream.scaladsl.Tcp.OutgoingConnection +import scala.collection.immutable import scala.concurrent.{ Future, Await } import akka.io.Tcp._ @@ -19,7 +20,7 @@ import akka.stream.testkit.Utils._ import akka.stream.scaladsl._ import akka.stream.testkit.TestUtils.temporaryServerAddress -class TcpSpec extends AkkaSpec with TcpHelper { +class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto") with TcpHelper { import akka.stream.io.TcpHelper._ var demand = 0L @@ -341,6 +342,44 @@ class TcpSpec extends AkkaSpec with TcpHelper { server.close() } + "properly full-close if requested" in assertAllStagesStopped { + import system.dispatcher + + val serverAddress = temporaryServerAddress() + val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = + Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) + + val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(writeButIgnoreRead).run() + })(Keep.left).run() + + val result = Source(() ⇒ Iterator.continually(ByteString("client data"))) + .via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort)) + .runFold(ByteString.empty)(_ ++ _) + + Await.result(result, 3.seconds) should ===(ByteString("Early response")) + + binding.map(_.unbind()) + } + + "Echo should work even if server is in full close mode" in { + import system.dispatcher + + val serverAddress = temporaryServerAddress() + + val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(Flow[ByteString]).run() + })(Keep.left).run() + + val result = Source(immutable.Iterable.fill(10000)(ByteString(0))) + .via(Tcp().outgoingConnection(serverAddress, halfClose = true)) + .runFold(0)(_ + _.size) + + Await.result(result, 3.seconds) should ===(10000) + + binding.map(_.unbind()) + } + } "TCP listen stream" must { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala index 12b1084882..bdf05e2ba2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -33,6 +33,7 @@ private[akka] object StreamTcpManager { localAddressPromise: Promise[InetSocketAddress], remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress], + halfClose: Boolean, options: immutable.Traversable[SocketOption], connectTimeout: Duration, idleTimeout: Duration) @@ -47,6 +48,7 @@ private[akka] object StreamTcpManager { flowSubscriber: Subscriber[StreamTcp.IncomingConnection], endpoint: InetSocketAddress, backlog: Int, + halfClose: Boolean, options: immutable.Traversable[SocketOption], idleTimeout: Duration) extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -72,18 +74,18 @@ private[akka] class StreamTcpManager extends Actor { } def receive: Receive = { - case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _) ⇒ + case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, _) ⇒ val connTimeout = connectTimeout match { case x: FiniteDuration ⇒ Some(x) case _ ⇒ None } - val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, + val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, halfClose, Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), materializerSettings = ActorFlowMaterializerSettings(context.system)), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) - case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _) ⇒ - val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, + case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, halfClose, options, _) ⇒ + val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, halfClose, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true), ActorFlowMaterializerSettings(context.system)) .withDispatcher(context.props.dispatcher) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index cb1ef5d232..3e5465c784 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -24,19 +24,21 @@ private[akka] object TcpStreamActor { def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]], localAddressPromise: Promise[InetSocketAddress], + halfClose: Boolean, connectCmd: Connect, materializerSettings: ActorFlowMaterializerSettings): Props = - Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, connectCmd, + Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, halfClose, connectCmd, materializerSettings)).withDispatcher(materializerSettings.dispatcher).withDeploy(Deploy.local) - def inboundProps(connection: ActorRef, settings: ActorFlowMaterializerSettings): Props = - Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) + def inboundProps(connection: ActorRef, halfClose: Boolean, settings: ActorFlowMaterializerSettings): Props = + Props(new InboundTcpStreamActor(connection, halfClose, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) + } /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings) extends Actor +private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings, halfClose: Boolean) extends Actor with ActorLogging { import TcpStreamActor._ @@ -47,6 +49,8 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS val primaryOutputs: Outputs = new SimpleOutputs(self, readPump) + def fullClose: Boolean = !halfClose + object tcpInputs extends DefaultInputTransferStates { private var closed: Boolean = false private var pendingElement: ByteString = null @@ -112,10 +116,13 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS object tcpOutputs extends DefaultOutputTransferStates { private var closed: Boolean = false - private var pendingDemand = true + private var lastWriteAcked = true private var connection: ActorRef = _ def isClosed: Boolean = closed + // Full-close mode needs to wait for the last write Ack before sending Close to avoid doing a connection reset + def isFlushed: Boolean = closed && (halfClose || lastWriteAcked) + private def initialized: Boolean = connection ne null def setConnection(c: ActorRef): Unit = { @@ -128,7 +135,12 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS def handleWrite: Receive = { case WriteAck ⇒ - pendingDemand = true + lastWriteAcked = true + if (fullClose && closed) { + // Finish the closing after the last write has been flushed in full close mode. + connection ! Close + tryShutdown() + } writePump.pump() } @@ -141,9 +153,16 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override def complete(): Unit = { if (!closed && initialized) { closed = true - if (tcpInputs.isClosed) + if (tcpInputs.isClosed && (halfClose || lastWriteAcked)) { + // We can immediately close if + // - half close mode, and read size already finished + // - full close mode, and last write has been acked + // + // if in full close mode, and has a non-acknowledged write, we will do the closing in handleWrite + // when the Ack arrives connection ! Close - else + tryShutdown() + } else connection ! ConfirmedClose } } @@ -153,10 +172,10 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override def enqueueOutputElement(elem: Any): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) connection ! Write(elem.asInstanceOf[ByteString], WriteAck) - pendingDemand = false + lastWriteAcked = false } - override def demandAvailable: Boolean = pendingDemand + override def demandAvailable: Boolean = lastWriteAcked } object writePump extends Pump { @@ -168,6 +187,12 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS } override protected def pumpFinished(): Unit = { + if (fullClose) { + // In full close mode we shut down the read size immediately once the write side is finished + tcpInputs.cancel() + primaryOutputs.complete() + readPump.pump() + } tcpOutputs.complete() primaryInputs.cancel() tryShutdown() @@ -228,7 +253,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS } def tryShutdown(): Unit = - if (primaryInputs.isClosed && tcpInputs.isClosed && tcpOutputs.isClosed) + if (primaryInputs.isClosed && tcpInputs.isClosed && tcpOutputs.isFlushed) context.stop(self) override def postStop(): Unit = { @@ -245,8 +270,8 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS * INTERNAL API */ private[akka] class InboundTcpStreamActor( - val connection: ActorRef, _settings: ActorFlowMaterializerSettings) - extends TcpStreamActor(_settings) { + val connection: ActorRef, _halfClose: Boolean, _settings: ActorFlowMaterializerSettings) + extends TcpStreamActor(_settings, _halfClose) { connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) tcpInputs.setConnection(connection) @@ -258,8 +283,9 @@ private[akka] class InboundTcpStreamActor( */ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[ByteString, ByteString]], localAddressPromise: Promise[InetSocketAddress], + _halfClose: Boolean, val connectCmd: Connect, _settings: ActorFlowMaterializerSettings) - extends TcpStreamActor(_settings) { + extends TcpStreamActor(_settings, _halfClose) { import TcpStreamActor._ import context.system diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 1e482a7551..63df82ae4d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -24,8 +24,9 @@ private[akka] object TcpListenStreamActor { def props(localAddressPromise: Promise[InetSocketAddress], unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + halfClose: Boolean, bindCmd: Tcp.Bind, materializerSettings: ActorFlowMaterializerSettings): Props = { - Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, bindCmd, materializerSettings)) + Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, halfClose, bindCmd, materializerSettings)) .withDeploy(Deploy.local) } } @@ -36,6 +37,7 @@ private[akka] object TcpListenStreamActor { private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocketAddress], unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + halfClose: Boolean, bindCmd: Tcp.Bind, settings: ActorFlowMaterializerSettings) extends Actor with Pump with ActorLogging { import ReactiveStreamsCompliance._ @@ -141,7 +143,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒ val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() - val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, settings)) + val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings)) val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor) val conn = StreamTcp.IncomingConnection( connected.localAddress, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index 3a08e91213..7f94cf442f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -99,13 +99,28 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * + * @param interface The interface to listen on + * @param port The port to listen on + * @param backlog Controls the size of the connection backlog + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the client to + * write to the connection even after the server has finished writing. The TCP socket is only closed + * after both the client and server finished writing. + * If set to false, the connection will immediately closed once the server closes its write side, + * independently whether the client is still attempting to write. This setting is recommended + * for servers, and therefore it is the default setting. */ def bind(interface: String, port: Int, backlog: Int, options: JIterable[SocketOption], + halfClose: Boolean, idleTimeout: Duration): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), idleTimeout) + Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), halfClose, idleTimeout) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -120,13 +135,27 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * + * @param remoteAddress The remote address to connect to + * @param localAddress Optional local address for the connection + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the server to + * write to the connection even after the client has finished writing. The TCP socket is only closed + * after both the client and server finished writing. This setting is recommended for clients and + * therefore it is the default setting. + * If set to false, the connection will immediately closed once the client closes its write side, + * independently whether the server is still attempting to write. */ def outgoingConnection(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress], options: JIterable[SocketOption], + halfClose: Boolean, connectTimeout: Duration, idleTimeout: Duration): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - Flow.adapt(delegate.outgoingConnection(remoteAddress, localAddress, immutableSeq(options), connectTimeout, idleTimeout) + Flow.adapt(delegate.outgoingConnection(remoteAddress, localAddress, immutableSeq(options), halfClose, connectTimeout, idleTimeout) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 835e60bbf8..a0aedf19b0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -76,6 +76,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { val endpoint: InetSocketAddress, val backlog: Int, val options: immutable.Traversable[SocketOption], + val halfClose: Boolean, val idleTimeout: Duration = Duration.Inf, val attributes: OperationAttributes, _shape: SourceShape[IncomingConnection]) extends SourceModule[IncomingConnection, Future[ServerBinding]](_shape) { @@ -93,6 +94,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { s.asInstanceOf[Subscriber[IncomingConnection]], endpoint, backlog, + halfClose, options, idleTimeout) } @@ -109,41 +111,90 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { } override protected def newInstance(s: SourceShape[IncomingConnection]): SourceModule[IncomingConnection, Future[ServerBinding]] = - new BindSource(endpoint, backlog, options, idleTimeout, attributes, shape) + new BindSource(endpoint, backlog, options, halfClose, idleTimeout, attributes, shape) override def withAttributes(attr: OperationAttributes): Module = - new BindSource(endpoint, backlog, options, idleTimeout, attr, shape) + new BindSource(endpoint, backlog, options, halfClose, idleTimeout, attr, shape) } /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * + * @param interface The interface to listen on + * @param port The port to listen on + * @param backlog Controls the size of the connection backlog + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the client to + * write to the connection even after the server has finished writing. The TCP socket is only closed + * after both the client and server finished writing. + * If set to false, the connection will immediately closed once the server closes its write side, + * independently whether the client is still attempting to write. This setting is recommended + * for servers, and therefore it is the default setting. */ def bind(interface: String, port: Int, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, + halfClose: Boolean = false, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { - new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, idleTimeout, + new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, halfClose, idleTimeout, OperationAttributes.none, SourceShape(new Outlet("BindSource.out")))) } + /** + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` + * handling the incoming connections using the provided Flow. + * + * @param handler A Flow that represents the server logic + * @param interface The interface to listen on + * @param port The port to listen on + * @param backlog Controls the size of the connection backlog + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the client to + * write to the connection even after the server has finished writing. The TCP socket is only closed + * after both the client and server finished writing. + * If set to false, the connection will immediately closed once the server closes its write side, + * independently whether the client is still attempting to write. This setting is recommended + * for servers, and therefore it is the default setting. + */ def bindAndHandle( handler: Flow[ByteString, ByteString, _], interface: String, port: Int, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, + halfClose: Boolean = false, idleTimeout: Duration = Duration.Inf)(implicit m: FlowMaterializer): Future[ServerBinding] = { - bind(interface, port, backlog, options, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒ + bind(interface, port, backlog, options, halfClose, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒ conn.flow.join(handler).run() }).run() } /** * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * + * @param remoteAddress The remote address to connect to + * @param localAddress Optional local address for the connection + * @param options TCP options for the connections, see [[akka.io.Tcp]] for details + * @param halfClose + * Controls whether the connection is kept open even after writing has been completed to the accepted + * TCP connections. + * If set to true, the connection will implement the TCP half-close mechanism, allowing the server to + * write to the connection even after the client has finished writing. The TCP socket is only closed + * after both the client and server finished writing. This setting is recommended for clients and + * therefore it is the default setting. + * If set to false, the connection will immediately closed once the client closes its write side, + * independently whether the server is still attempting to write. */ def outgoingConnection(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil, + halfClose: Boolean = true, connectTimeout: Duration = Duration.Inf, idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { @@ -152,7 +203,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { Flow[ByteString].andThenMat(() ⇒ { val processorPromise = Promise[Processor[ByteString, ByteString]]() val localAddressPromise = Promise[InetSocketAddress]() - manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, + manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, idleTimeout) import system.dispatcher val outgoingConnection = localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))