From 99a9c5964e51b2651e50f9333e9f6026ef344aa1 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 22 Oct 2015 15:29:12 +0200 Subject: [PATCH 1/2] =str #18763 remove unused parameter in PoolConductor --- .../scala/akka/http/impl/engine/client/PoolConductor.scala | 6 +++--- .../main/scala/akka/http/impl/engine/client/PoolFlow.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala index a5d5a9dba8..feb9809891 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala @@ -64,12 +64,12 @@ private object PoolConductor { +---------+ */ - def apply(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] = + def apply(slotCount: Int, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] = FlowGraph.create() { implicit b ⇒ import FlowGraph.Implicits._ val retryMerge = b.add(MergePreferred[RequestContext](1, eagerClose = true)) - val slotSelector = b.add(new SlotSelector(slotCount, maxRetries, pipeliningLimit, log)) + val slotSelector = b.add(new SlotSelector(slotCount, pipeliningLimit, log)) val route = b.add(new Route(slotCount)) val retrySplit = b.add(Broadcast[RawSlotEvent](2)) val flatten = Flow[RawSlotEvent].mapAsyncUnordered(slotCount) { @@ -106,7 +106,7 @@ private object PoolConductor { private case class Busy(openRequests: Int) extends SlotState { require(openRequests > 0) } private object Busy extends Busy(1) - private class SlotSelector(slotCount: Int, maxRetries: Int, pipeliningLimit: Int, log: LoggingAdapter) + private class SlotSelector(slotCount: Int, pipeliningLimit: Int, log: LoggingAdapter) extends GraphStage[FanInShape2[RequestContext, SlotEvent, SwitchCommand]] { private val ctxIn = Inlet[RequestContext]("requestContext") diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala index a6d00908a7..42aea1c9c2 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala @@ -74,7 +74,7 @@ private object PoolFlow { import settings._ import FlowGraph.Implicits._ - val conductor = b.add(PoolConductor(maxConnections, maxRetries, pipeliningLimit, log)) + val conductor = b.add(PoolConductor(maxConnections, pipeliningLimit, log)) val slots = Vector .tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings)) .map(b.add(_)) From 2c2228c2414472e6941bd8d737c47df49e240226 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Fri, 23 Oct 2015 06:41:02 -0700 Subject: [PATCH 2/2] =str #16597 initial steps with idleTimeout --- .../engine/server/HttpServerBluePrint.scala | 37 +++++++--------- .../akka/http/impl/util/StreamUtils.scala | 19 +++++---- .../main/scala/akka/http/scaladsl/Http.scala | 6 +-- .../server/HttpServerTestSetupBase.scala | 6 ++- .../impl/engine/ws/ByteStringSinkProbe.scala | 4 +- .../akka/http/scaladsl/ClientServerSpec.scala | 42 ++++++++++++++++++- .../http/scaladsl/server/TestServer.scala | 1 + .../stream/impl/io/StreamTcpManager.scala | 8 ++-- .../stream/impl/io/TcpConnectionStream.scala | 5 ++- .../stream/impl/io/TcpListenStreamActor.scala | 31 +++++++++----- .../main/scala/akka/stream/io/Timeouts.scala | 5 +-- .../main/scala/akka/stream/scaladsl/Tcp.scala | 32 +++++++------- 12 files changed, 124 insertions(+), 72 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index efad72e9e7..f7feddcdbf 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -7,31 +7,26 @@ package akka.http.impl.engine.server import java.net.InetSocketAddress import java.util.Random -import akka.http.ServerSettings -import akka.http.scaladsl.model.ws.Message -import akka.stream.io._ -import org.reactivestreams.{ Subscriber, Publisher } -import scala.util.control.NonFatal -import akka.util.ByteString +import akka.actor.{ ActorRef, Deploy, Props } import akka.event.LoggingAdapter -import akka.actor.{ Deploy, ActorRef, Props } -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.stage.PushPullStage -import akka.http.impl.engine.parsing._ -import akka.http.impl.engine.rendering.{ ResponseRenderingOutput, ResponseRenderingContext, HttpResponseRendererFactory } +import akka.http.ServerSettings import akka.http.impl.engine.TokenSourceActor +import akka.http.impl.engine.parsing.ParserOutput._ +import akka.http.impl.engine.parsing._ +import akka.http.impl.engine.rendering.{ HttpResponseRendererFactory, ResponseRenderingContext, ResponseRenderingOutput } +import akka.http.impl.engine.ws.Websocket.SwitchToWebsocketToken +import akka.http.impl.engine.ws._ +import akka.http.impl.util._ import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.http.impl.util._ -import akka.http.impl.engine.ws._ -import Websocket.SwitchToWebsocketToken -import ParserOutput._ -import akka.stream.stage.GraphStage -import akka.stream.stage.GraphStageLogic -import akka.stream.stage.OutHandler -import akka.stream.stage.InHandler -import akka.http.impl.engine.rendering.ResponseRenderingContext +import akka.stream._ +import akka.stream.io._ +import akka.stream.scaladsl._ +import akka.stream.stage._ +import akka.util.ByteString +import org.reactivestreams.{ Publisher, Subscriber } + +import scala.util.control.NonFatal /** * INTERNAL API diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index baf25102ca..d4c22aec04 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -4,18 +4,19 @@ package akka.http.impl.util -import java.io.InputStream -import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{ SourceModule, SinkModule, PublisherSink } -import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher } -import scala.collection.immutable -import scala.concurrent.{ Promise, ExecutionContext, Future } -import akka.util.ByteString +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } + import akka.http.scaladsl.model.RequestEntity import akka.stream._ +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } import akka.stream.scaladsl._ import akka.stream.stage._ +import akka.util.ByteString +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } + +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future, Promise } /** * INTERNAL API @@ -305,7 +306,7 @@ private[http] object StreamUtils { } /** - * Similar to Source.lazyEmpty but doesn't rely on materialization. Can only be used once. + * Similar to Source.maybe but doesn't rely on materialization. Can only be used once. */ trait OneTimeValve { def source[T]: Source[T, Unit] diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index c284afb2a5..7567279588 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -5,9 +5,8 @@ package akka.http.scaladsl import java.net.InetSocketAddress -import java.security.SecureRandom import java.util.concurrent.ConcurrentHashMap -import java.util.{ Collection ⇒ JCollection, Random } +import java.util.{ Collection ⇒ JCollection } import javax.net.ssl.{ SSLContext, SSLParameters } import akka.actor._ @@ -15,8 +14,8 @@ import akka.event.LoggingAdapter import akka.http._ import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ -import akka.http.impl.util.{ ReadTheDocumentationException, Java6Compat, StreamUtils } import akka.http.impl.engine.ws.WebsocketClientBlueprint +import akka.http.impl.util.{ Java6Compat, ReadTheDocumentationException, StreamUtils } import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Host import akka.http.scaladsl.model.ws.{ WebsocketUpgradeResponse, WebsocketRequest, Message } @@ -25,7 +24,6 @@ import akka.japi import akka.stream.Materializer import akka.stream.io._ import akka.stream.scaladsl._ -import akka.util.ByteString import com.typesafe.config.Config import scala.collection.immutable diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala index 37bef87e1e..dd279f9717 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala @@ -10,6 +10,7 @@ import akka.http.impl.engine.ws.ByteStringSinkProbe import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes } import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.event.NoLogging @@ -32,7 +33,8 @@ abstract class HttpServerTestSetupBase { val requests = TestSubscriber.probe[HttpRequest] val responses = TestPublisher.probe[HttpResponse]() - def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test"))))) + def settings = ServerSettings(system) + .copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test"))))) def remoteAddress: Option[InetSocketAddress] = None val (netIn, netOut) = { @@ -68,6 +70,8 @@ abstract class HttpServerTestSetupBase { def expectRequest: HttpRequest = requests.requestNext() def expectNoRequest(max: FiniteDuration): Unit = requests.expectNoMsg(max) + def expectSubscribe(): Unit = netOut.expectComplete() + def expectSubscribeAndNetworkClose(): Unit = netOut.expectSubscriptionAndComplete() def expectNetworkClose(): Unit = netOut.expectComplete() def send(data: ByteString): Unit = netIn.sendNext(data) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala index 9d793c28a9..b1ea76ffd1 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/ByteStringSinkProbe.scala @@ -5,7 +5,7 @@ package akka.http.impl.engine.ws import akka.actor.ActorSystem -import akka.stream.scaladsl.{ Source, Sink } +import akka.stream.scaladsl.Sink import akka.stream.testkit.TestSubscriber import akka.util.ByteString @@ -23,6 +23,7 @@ trait ByteStringSinkProbe { def expectNoBytes(): Unit def expectNoBytes(timeout: FiniteDuration): Unit + def expectSubscriptionAndComplete(): Unit def expectComplete(): Unit def expectError(): Throwable def expectError(cause: Throwable): Unit @@ -62,6 +63,7 @@ object ByteStringSinkProbe { def expectUtf8EncodedString(string: String): Unit = expectBytes(ByteString(string, "utf8")) + def expectSubscriptionAndComplete(): Unit = probe.expectSubscriptionAndComplete() def expectComplete(): Unit = probe.expectComplete() def expectError(): Throwable = probe.expectError() def expectError(cause: Throwable): Unit = probe.expectError(cause) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 3f6e69ce1c..5fc481a5f7 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2014 Typesafe Inc. + * Copyright (C) 2009-2015 Typesafe Inc. */ package akka.http.scaladsl @@ -8,6 +8,7 @@ import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStream import java.net.Socket import akka.http.scaladsl.Http.ServerBinding import akka.http.{ ClientConnectionSettings, ServerSettings } +import akka.util.ByteString import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec import scala.concurrent.{ Promise, Future, Await } @@ -24,7 +25,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.impl.util._ -import scala.util.Success +import scala.util.{ Failure, Try, Success } class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -138,6 +139,43 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { Await.result(b1.unbind(), 1.second) } + "close connection with idle client after idleTimeout" in { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val s = ServerSettings(system) + val theIdleTimeout = 300.millis + val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = theIdleTimeout)) + + val receivedRequest = Promise[Long]() + + def handle(req: HttpRequest): Future[HttpResponse] = { + receivedRequest.complete(Success(System.nanoTime())) + Promise().future // never complete the request with a response; 're waiting for the timeout to happen, nothing else + } + + val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) + val b1 = Await.result(binding, 3.seconds) + + def runIdleRequest(uri: Uri): Future[HttpResponse] = { + val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().outgoingConnection(hostname, port) + .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) + ._2 + } + + val clientsResponseFuture = runIdleRequest("/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + Try(Await.result(clientsResponseFuture, 2.second)).recoverWith { + case _: StreamTcpException ⇒ Success(System.nanoTime()) + case other: Throwable ⇒ Failure(other) + }.get + val diff = System.nanoTime() - serverReceivedRequestAtNanos + diff should be > theIdleTimeout.toNanos + } + "log materialization errors in `bindAndHandle`" which { "are triggered in `transform`" in Utils.assertAllStagesStopped { diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index 237c8f70c0..604caee07e 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -6,6 +6,7 @@ package akka.http.scaladsl.server import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport import akka.http.scaladsl.server.directives.Credentials +import akka.stream.scaladsl._ import com.typesafe.config.{ ConfigFactory, Config } import akka.actor.ActorSystem import akka.stream.ActorMaterializer diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala index 7cb9ca3790..18ab1a3b61 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -74,18 +74,18 @@ private[akka] class StreamTcpManager extends Actor { } def receive: Receive = { - case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, _) ⇒ + case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, idleTimeout) ⇒ val connTimeout = connectTimeout match { case x: FiniteDuration ⇒ Some(x) case _ ⇒ None } - val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, halfClose, + val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, halfClose, idleTimeout, Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), materializerSettings = ActorMaterializerSettings(context.system)), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) - case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, halfClose, options, _) ⇒ - val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, halfClose, + case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, halfClose, options, idleTimeout) ⇒ + val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, halfClose, idleTimeout, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true), ActorMaterializerSettings(context.system)) .withDispatcher(context.props.dispatcher) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 5fc1084479..cf0b0b887c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -13,6 +13,7 @@ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Stre import org.reactivestreams.Processor import akka.stream.impl._ +import scala.concurrent.duration.Duration import scala.util.control.NoStackTrace /** @@ -24,9 +25,10 @@ private[akka] object TcpStreamActor { def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]], localAddressPromise: Promise[InetSocketAddress], halfClose: Boolean, + idleTimeout: Duration, connectCmd: Connect, materializerSettings: ActorMaterializerSettings): Props = - Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, halfClose, connectCmd, + Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, halfClose, idleTimeout, connectCmd, materializerSettings)).withDispatcher(materializerSettings.dispatcher).withDeploy(Deploy.local) def inboundProps(connection: ActorRef, halfClose: Boolean, settings: ActorMaterializerSettings): Props = @@ -302,6 +304,7 @@ private[akka] class InboundTcpStreamActor( private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[ByteString, ByteString]], localAddressPromise: Promise[InetSocketAddress], _halfClose: Boolean, + idleTimeout: Duration, val connectCmd: Connect, _settings: ActorMaterializerSettings) extends TcpStreamActor(_settings, _halfClose) { import context.system diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 467f2f2132..8af8d3a578 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -4,18 +4,19 @@ package akka.stream.impl.io import java.net.InetSocketAddress -import scala.concurrent.{ Future, Promise } + import akka.actor._ -import akka.io.{ IO, Tcp } import akka.io.Tcp._ -import akka.stream.{ Materializer, ActorMaterializerSettings } +import akka.io.{ IO, Tcp } import akka.stream.impl._ -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.{ Tcp ⇒ StreamTcp } +import akka.stream.io.Timeouts +import akka.stream.scaladsl.{ Flow, Tcp ⇒ StreamTcp } +import akka.stream.{ ActorMaterializerSettings, BindFailedException, ConnectionException } import akka.util.ByteString import org.reactivestreams.Subscriber -import akka.stream.ConnectionException -import akka.stream.BindFailedException + +import scala.concurrent.duration.Duration +import scala.concurrent.{ Future, Promise } /** * INTERNAL API @@ -25,8 +26,9 @@ private[akka] object TcpListenStreamActor { unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], halfClose: Boolean, + idleTimeout: Duration, bindCmd: Tcp.Bind, materializerSettings: ActorMaterializerSettings): Props = { - Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, halfClose, bindCmd, materializerSettings)) + Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, halfClose, bindCmd, idleTimeout, materializerSettings)) .withDeploy(Deploy.local) } } @@ -38,7 +40,9 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], halfClose: Boolean, - bindCmd: Tcp.Bind, settings: ActorMaterializerSettings) extends Actor + bindCmd: Tcp.Bind, + idleTimeout: Duration, + settings: ActorMaterializerSettings) extends Actor with Pump with ActorLogging { import ReactiveStreamsCompliance._ import context.system @@ -151,10 +155,17 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() val tcpStreamActor = context.watch(context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings))) val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor) + + import scala.concurrent.duration._ + val handler = (idleTimeout match { + case d: FiniteDuration ⇒ Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) + case _ ⇒ Flow[ByteString] + }).andThenMat(() ⇒ (processor, ())) + val conn = StreamTcp.IncomingConnection( connected.localAddress, connected.remoteAddress, - Flow[ByteString].andThenMat(() ⇒ (processor, ()))) + handler) primaryOutputs.enqueueOutputElement(conn) } diff --git a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala index aed5dc375b..52e1c69a7d 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala +++ b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala @@ -2,11 +2,10 @@ package akka.stream.io import java.util.concurrent.{ TimeUnit, TimeoutException } -import akka.actor.{ Cancellable, ActorSystem } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.{ FlowShape, Outlet, Inlet, BidiShape } import akka.stream.scaladsl.{ BidiFlow, Flow } -import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } +import akka.stream.stage._ +import akka.stream.{ BidiShape, Inlet, Outlet } import scala.concurrent.duration.{ Deadline, FiniteDuration } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 971ab51de4..125b208242 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -3,26 +3,23 @@ */ package akka.stream.scaladsl -import java.net.{ InetSocketAddress, URLEncoder } -import akka.stream.impl.StreamLayout.Module -import scala.collection.immutable -import scala.concurrent.{ Promise, ExecutionContext, Future } -import scala.concurrent.duration.Duration -import scala.util.{ Failure, Success } -import scala.util.control.NoStackTrace +import java.net.InetSocketAddress + import akka.actor._ import akka.io.Inet.SocketOption import akka.io.{ Tcp ⇒ IoTcp } import akka.stream._ -import akka.stream.impl._ import akka.stream.impl.ReactiveStreamsCompliance._ -import akka.stream.scaladsl._ +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl._ +import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager } +import akka.stream.io.Timeouts import akka.util.ByteString -import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription } -import akka.stream.impl.io.TcpStreamActor -import akka.stream.impl.io.TcpListenStreamActor -import akka.stream.impl.io.DelayedInitProcessor -import akka.stream.impl.io.StreamTcpManager +import org.reactivestreams.{ Processor, Publisher, Subscriber } + +import scala.collection.immutable +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.{ Future, Promise } object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { @@ -206,7 +203,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { connectTimeout: Duration = Duration.Inf, idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { - val remoteAddr = remoteAddress + val timeoutHandling = idleTimeout match { + case d: FiniteDuration ⇒ Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) + case _ ⇒ Flow[ByteString] + } Flow[ByteString].andThenMat(() ⇒ { val processorPromise = Promise[Processor[ByteString, ByteString]]() @@ -216,7 +216,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import system.dispatcher val outgoingConnection = localAddressPromise.future.map(OutgoingConnection(remoteAddress, _)) (new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), outgoingConnection) - }) + }).via(timeoutHandling) }