diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index e8db1d88c2..3137663624 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -13,9 +13,9 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString -import akka.stream.{ ActorMaterializer, Materializer } +import akka.stream.Materializer import akka.stream.scaladsl._ -import akka.stream.io.{ Timeouts, SynchronousFileSource } +import akka.stream.io.SynchronousFileSource import akka.{ japi, stream } import akka.http.scaladsl.util.FastFuture import akka.http.javadsl.{ model ⇒ jm } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala similarity index 84% rename from akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala index 38b6637686..de6fad3546 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala @@ -2,17 +2,17 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream.io +package akka.stream.impl import java.util.concurrent.TimeoutException -import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.scaladsl._ -import akka.stream.testkit.{ AkkaSpec, TestSubscriber, TestPublisher } -import scala.concurrent.{ Future, Await } -import scala.concurrent.duration._ - import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } +import akka.stream.{ ActorMaterializer, ClosedShape } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } class TimeoutsSpec extends AkkaSpec { implicit val mat = ActorMaterializer() @@ -21,16 +21,18 @@ class TimeoutsSpec extends AkkaSpec { "pass through elements unmodified" in assertAllStagesStopped { Await.result( - Source(1 to 100).via(Timeouts.initalTimeout(2.seconds)).grouped(200).runWith(Sink.head), + Source(1 to 100).initialTimeout(2.seconds).grouped(200).runWith(Sink.head), 3.seconds) should ===(1 to 100) } "pass through error unmodified" in assertAllStagesStopped { a[TE] shouldBe thrownBy { Await.result( - Source(1 to 100).concat(Source.failed(TE("test"))) - .via(Timeouts.initalTimeout(2.seconds)) - .grouped(200).runWith(Sink.head), + Source(1 to 100) + .concat(Source.failed(TE("test"))) + .initialTimeout(2.seconds) + .grouped(200) + .runWith(Sink.head), 3.seconds) } } @@ -38,7 +40,7 @@ class TimeoutsSpec extends AkkaSpec { "fail if no initial element passes until timeout" in assertAllStagesStopped { val downstreamProbe = TestSubscriber.probe[Int]() Source.maybe[Int] - .via(Timeouts.initalTimeout(1.seconds)) + .initialTimeout(1.second) .runWith(Sink(downstreamProbe)) downstreamProbe.expectSubscription() @@ -54,7 +56,7 @@ class TimeoutsSpec extends AkkaSpec { "pass through elements unmodified" in assertAllStagesStopped { Await.result( - Source(1 to 100).via(Timeouts.completionTimeout(2.seconds)).grouped(200).runWith(Sink.head), + Source(1 to 100).completionTimeout(2.seconds).grouped(200).runWith(Sink.head), 3.seconds) should ===(1 to 100) } @@ -62,7 +64,7 @@ class TimeoutsSpec extends AkkaSpec { a[TE] shouldBe thrownBy { Await.result( Source(1 to 100).concat(Source.failed(TE("test"))) - .via(Timeouts.completionTimeout(2.seconds)) + .completionTimeout(2.seconds) .grouped(200).runWith(Sink.head), 3.seconds) } @@ -72,7 +74,7 @@ class TimeoutsSpec extends AkkaSpec { val upstreamProbe = TestPublisher.probe[Int]() val downstreamProbe = TestSubscriber.probe[Int]() Source(upstreamProbe) - .via(Timeouts.completionTimeout(2.seconds)) + .completionTimeout(2.seconds) .runWith(Sink(downstreamProbe)) upstreamProbe.sendNext(1) @@ -93,7 +95,7 @@ class TimeoutsSpec extends AkkaSpec { "pass through elements unmodified" in assertAllStagesStopped { Await.result( - Source(1 to 100).via(Timeouts.idleTimeout(2.seconds)).grouped(200).runWith(Sink.head), + Source(1 to 100).idleTimeout(2.seconds).grouped(200).runWith(Sink.head), 3.seconds) should ===(1 to 100) } @@ -101,7 +103,7 @@ class TimeoutsSpec extends AkkaSpec { a[TE] shouldBe thrownBy { Await.result( Source(1 to 100).concat(Source.failed(TE("test"))) - .via(Timeouts.idleTimeout(2.seconds)) + .idleTimeout(2.seconds) .grouped(200).runWith(Sink.head), 3.seconds) } @@ -111,7 +113,7 @@ class TimeoutsSpec extends AkkaSpec { val upstreamProbe = TestPublisher.probe[Int]() val downstreamProbe = TestSubscriber.probe[Int]() Source(upstreamProbe) - .via(Timeouts.idleTimeout(1.seconds)) + .idleTimeout(1.seconds) .runWith(Sink(downstreamProbe)) // Two seconds in overall, but won't timeout until time between elements is large enough @@ -131,7 +133,7 @@ class TimeoutsSpec extends AkkaSpec { "IdleTimeoutBidi" must { "not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped { - val timeoutIdentity = Timeouts.idleTimeoutBidi[Int, Int](2.seconds).join(Flow[Int]) + val timeoutIdentity = BidiFlow.bidirectionalIdleTimeout[Int, Int](2.seconds).join(Flow[Int]) Await.result( Source(1 to 100).via(timeoutIdentity).grouped(200).runWith(Sink.head), @@ -146,7 +148,7 @@ class TimeoutsSpec extends AkkaSpec { val downstream = Flow.fromSinkAndSourceMat(Sink.ignore, Source(downstreamWriter))(Keep.left) val assembly: RunnableGraph[(Future[Unit], Future[Unit])] = upstream - .joinMat(Timeouts.idleTimeoutBidi[Int, String](2.seconds))(Keep.left) + .joinMat(BidiFlow.bidirectionalIdleTimeout[Int, String](2.seconds))(Keep.left) .joinMat(downstream)(Keep.both) val (upFinished, downFinished) = assembly.run() @@ -174,7 +176,7 @@ class TimeoutsSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ import FlowGraph.Implicits._ - val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) + val timeoutStage = b.add(BidiFlow.bidirectionalIdleTimeout[String, Int](2.seconds)) Source(upWrite) ~> timeoutStage.in1; timeoutStage.out1 ~> Sink(downRead) Sink(upRead) <~ timeoutStage.out2; @@ -222,7 +224,7 @@ class TimeoutsSpec extends AkkaSpec { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ import FlowGraph.Implicits._ - val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) + val timeoutStage = b.add(BidiFlow.bidirectionalIdleTimeout[String, Int](2.seconds)) Source(upWrite) ~> timeoutStage.in1; timeoutStage.out1 ~> Sink(downRead) Sink(upRead) <~ timeoutStage.out2; diff --git a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala b/akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala similarity index 69% rename from akka-stream/src/main/scala/akka/stream/io/Timeouts.scala rename to akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala index fe01877147..7744619ff6 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala @@ -1,15 +1,16 @@ -package akka.stream.io +package akka.stream.impl import java.util.concurrent.{ TimeUnit, TimeoutException } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.scaladsl.{ BidiFlow, Flow } -import akka.stream.stage._ import akka.stream.{ BidiShape, Inlet, Outlet, Attributes } +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } import scala.concurrent.duration.{ Deadline, FiniteDuration } /** + * INTERNAL API + * * Various stages for controlling timeouts on IO related streams (although not necessarily). * * The common theme among the processing stages here that @@ -18,41 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration } * - if the timer fires before the event happens, these stages all fail the stream * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure */ -object Timeouts { - - /** - * If the first element has not passed through this stage before the provided timeout, the stream is failed - * with a [[TimeoutException]]. - */ - def initalTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = - Flow.fromGraph(new InitialTimeout[T](timeout)) - - /** - * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[TimeoutException]]. - */ - def completionTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = - Flow.fromGraph(new CompletionTimeout[T](timeout)) - - /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[TimeoutException]]. - */ - def idleTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = - Flow.fromGraph(new IdleTimeout[T](timeout)) - - /** - * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[TimeoutException]]. - * - * There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage. - * If the timout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing - * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers - * the *joint* frequencies of the elements in both directions. - */ - def idleTimeoutBidi[A, B](timeout: FiniteDuration): BidiFlow[A, A, B, B, Unit] = - BidiFlow.fromGraph(new IdleTimeoutBidi[A, B](timeout)) - +private[stream] object Timeouts { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { import scala.concurrent.duration._ FiniteDuration( @@ -60,8 +27,7 @@ object Timeouts { TimeUnit.NANOSECONDS) } - private class InitialTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - + final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { private var initialHasPassed = false @@ -86,7 +52,7 @@ object Timeouts { override def toString = "InitialTimeoutTimer" } - private class CompletionTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { @@ -106,7 +72,7 @@ object Timeouts { override def toString = "CompletionTimeout" } - private class IdleTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { private var nextDeadline: Deadline = Deadline.now + timeout override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { @@ -131,7 +97,7 @@ object Timeouts { override def toString = "IdleTimeout" } - private class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { + final class IdleBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { val in1 = Inlet[I]("in1") val in2 = Inlet[O]("in2") val out1 = Outlet[I]("out1") diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 3c25bc79e6..b304c8cc85 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -9,8 +9,7 @@ import akka.actor._ import akka.io.Tcp._ import akka.io.{ IO, Tcp } import akka.stream.impl._ -import akka.stream.io.Timeouts -import akka.stream.scaladsl.{ Flow, Tcp ⇒ StreamTcp } +import akka.stream.scaladsl.{ Tcp ⇒ StreamTcp, BidiFlow, Flow } import akka.stream.{ ActorMaterializerSettings, BindFailedException, ConnectionException } import akka.util.ByteString import org.reactivestreams.Subscriber @@ -158,7 +157,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket import scala.concurrent.duration.FiniteDuration val handler = (idleTimeout match { - case d: FiniteDuration ⇒ Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) + case d: FiniteDuration ⇒ Flow[ByteString].join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d)) case _ ⇒ Flow[ByteString] }).via(Flow.fromProcessor(() ⇒ processor)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index ceb811904f..88de03067c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -6,6 +6,8 @@ package akka.stream.javadsl import akka.japi.function import akka.stream._ +import scala.concurrent.duration.FiniteDuration + object BidiFlow { /** * A graph with the shape of a BidiFlow logically is a BidiFlow, this method makes @@ -73,6 +75,17 @@ object BidiFlow { def fromFunctions[I1, O1, I2, O2](top: function.Function[I1, O1], bottom: function.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = new BidiFlow(scaladsl.BidiFlow.fromFunctions(top.apply _, bottom.apply _)) + /** + * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage. + * If the timeout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing + * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers + * the *joint* frequencies of the elements in both directions. + */ + def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] = + new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout)) } class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 48fd37535e..1481f1a291 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -996,6 +996,27 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.initialTimeout(timeout)) + + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.completionTimeout(timeout)) + + /** + * If the time between two processed elements exceed the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.idleTimeout(timeout)) + override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index ff6889c0cd..e04f2eecd5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -846,6 +846,27 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = new Source(delegate.flatten(strategy)) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.initialTimeout(timeout)) + + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.completionTimeout(timeout)) + + /** + * If the time between two processed elements exceed the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + */ + def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.idleTimeout(timeout)) + override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 4152d64fb8..beaea9a013 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -5,9 +5,12 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.Timeouts + +import scala.concurrent.duration.FiniteDuration final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { - override val shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] + override def shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] /** * Add the given BidiFlow as the next step in a bidirectional transformation @@ -193,4 +196,16 @@ object BidiFlow { */ def fromFunctions[I1, O1, I2, O2](outbound: I1 ⇒ O1, inbound: I2 ⇒ O2): BidiFlow[I1, O1, I2, O2, Unit] = fromFlows(Flow[I1].map(outbound), Flow[I2].map(inbound)) + + /** + * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed + * with a [[scala.concurrent.TimeoutException]]. + * + * There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage. + * If the timeout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing + * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers + * the *joint* frequencies of the elements in both directions. + */ + def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] = + fromGraph(new Timeouts.IdleBidi(timeout)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5f099809bc..64babe0375 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,8 +10,7 @@ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } -import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout } -import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timeouts } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -193,7 +192,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) // FIXME: Only exists to keep old stuff alive private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): Repr[U, Mat2] = { - val op = Stages.DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)]) + val op = DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)]) if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]] else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) } @@ -1019,6 +1018,24 @@ trait FlowOps[+Out, +Mat] { throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[scala.concurrent.TimeoutException]]. + */ + def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Initial[Out](timeout)) + + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[scala.concurrent.TimeoutException]]. + */ + def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Completion[Out](timeout)) + + /** + * If the time between two processed elements exceed the provided timeout, the stream is failed + * with a [[scala.concurrent.TimeoutException]]. + */ + def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Idle[Out](timeout)) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 3f16f5258f..dd218b11af 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -13,7 +13,6 @@ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager } -import akka.stream.io.Timeouts import akka.util.ByteString import org.reactivestreams.{ Processor, Publisher, Subscriber } @@ -204,7 +203,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { val timeoutHandling = idleTimeout match { - case d: FiniteDuration ⇒ Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) + case d: FiniteDuration ⇒ Flow[ByteString].join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d)) case _ ⇒ Flow[ByteString] }