!str - 18608 - Moves the a.s.io.Timeouts functionality into Source/Flow/BidiFlow

This commit is contained in:
Viktor Klang 2015-11-01 13:13:35 +01:00
parent 06ce968b16
commit bbb942d44e
10 changed files with 128 additions and 75 deletions

View file

@ -13,9 +13,9 @@ import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable import scala.collection.immutable
import akka.util.ByteString import akka.util.ByteString
import akka.stream.{ ActorMaterializer, Materializer } import akka.stream.Materializer
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.io.{ Timeouts, SynchronousFileSource } import akka.stream.io.SynchronousFileSource
import akka.{ japi, stream } import akka.{ japi, stream }
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.http.javadsl.{ model jm } import akka.http.javadsl.{ model jm }

View file

@ -2,17 +2,17 @@
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream.io package akka.stream.impl
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.stream.{ ClosedShape, ActorMaterializer }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber, TestPublisher }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, ClosedShape }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
class TimeoutsSpec extends AkkaSpec { class TimeoutsSpec extends AkkaSpec {
implicit val mat = ActorMaterializer() implicit val mat = ActorMaterializer()
@ -21,16 +21,18 @@ class TimeoutsSpec extends AkkaSpec {
"pass through elements unmodified" in assertAllStagesStopped { "pass through elements unmodified" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 100).via(Timeouts.initalTimeout(2.seconds)).grouped(200).runWith(Sink.head), Source(1 to 100).initialTimeout(2.seconds).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100) 3.seconds) should ===(1 to 100)
} }
"pass through error unmodified" in assertAllStagesStopped { "pass through error unmodified" in assertAllStagesStopped {
a[TE] shouldBe thrownBy { a[TE] shouldBe thrownBy {
Await.result( Await.result(
Source(1 to 100).concat(Source.failed(TE("test"))) Source(1 to 100)
.via(Timeouts.initalTimeout(2.seconds)) .concat(Source.failed(TE("test")))
.grouped(200).runWith(Sink.head), .initialTimeout(2.seconds)
.grouped(200)
.runWith(Sink.head),
3.seconds) 3.seconds)
} }
} }
@ -38,7 +40,7 @@ class TimeoutsSpec extends AkkaSpec {
"fail if no initial element passes until timeout" in assertAllStagesStopped { "fail if no initial element passes until timeout" in assertAllStagesStopped {
val downstreamProbe = TestSubscriber.probe[Int]() val downstreamProbe = TestSubscriber.probe[Int]()
Source.maybe[Int] Source.maybe[Int]
.via(Timeouts.initalTimeout(1.seconds)) .initialTimeout(1.second)
.runWith(Sink(downstreamProbe)) .runWith(Sink(downstreamProbe))
downstreamProbe.expectSubscription() downstreamProbe.expectSubscription()
@ -54,7 +56,7 @@ class TimeoutsSpec extends AkkaSpec {
"pass through elements unmodified" in assertAllStagesStopped { "pass through elements unmodified" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 100).via(Timeouts.completionTimeout(2.seconds)).grouped(200).runWith(Sink.head), Source(1 to 100).completionTimeout(2.seconds).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100) 3.seconds) should ===(1 to 100)
} }
@ -62,7 +64,7 @@ class TimeoutsSpec extends AkkaSpec {
a[TE] shouldBe thrownBy { a[TE] shouldBe thrownBy {
Await.result( Await.result(
Source(1 to 100).concat(Source.failed(TE("test"))) Source(1 to 100).concat(Source.failed(TE("test")))
.via(Timeouts.completionTimeout(2.seconds)) .completionTimeout(2.seconds)
.grouped(200).runWith(Sink.head), .grouped(200).runWith(Sink.head),
3.seconds) 3.seconds)
} }
@ -72,7 +74,7 @@ class TimeoutsSpec extends AkkaSpec {
val upstreamProbe = TestPublisher.probe[Int]() val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]() val downstreamProbe = TestSubscriber.probe[Int]()
Source(upstreamProbe) Source(upstreamProbe)
.via(Timeouts.completionTimeout(2.seconds)) .completionTimeout(2.seconds)
.runWith(Sink(downstreamProbe)) .runWith(Sink(downstreamProbe))
upstreamProbe.sendNext(1) upstreamProbe.sendNext(1)
@ -93,7 +95,7 @@ class TimeoutsSpec extends AkkaSpec {
"pass through elements unmodified" in assertAllStagesStopped { "pass through elements unmodified" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 100).via(Timeouts.idleTimeout(2.seconds)).grouped(200).runWith(Sink.head), Source(1 to 100).idleTimeout(2.seconds).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100) 3.seconds) should ===(1 to 100)
} }
@ -101,7 +103,7 @@ class TimeoutsSpec extends AkkaSpec {
a[TE] shouldBe thrownBy { a[TE] shouldBe thrownBy {
Await.result( Await.result(
Source(1 to 100).concat(Source.failed(TE("test"))) Source(1 to 100).concat(Source.failed(TE("test")))
.via(Timeouts.idleTimeout(2.seconds)) .idleTimeout(2.seconds)
.grouped(200).runWith(Sink.head), .grouped(200).runWith(Sink.head),
3.seconds) 3.seconds)
} }
@ -111,7 +113,7 @@ class TimeoutsSpec extends AkkaSpec {
val upstreamProbe = TestPublisher.probe[Int]() val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]() val downstreamProbe = TestSubscriber.probe[Int]()
Source(upstreamProbe) Source(upstreamProbe)
.via(Timeouts.idleTimeout(1.seconds)) .idleTimeout(1.seconds)
.runWith(Sink(downstreamProbe)) .runWith(Sink(downstreamProbe))
// Two seconds in overall, but won't timeout until time between elements is large enough // Two seconds in overall, but won't timeout until time between elements is large enough
@ -131,7 +133,7 @@ class TimeoutsSpec extends AkkaSpec {
"IdleTimeoutBidi" must { "IdleTimeoutBidi" must {
"not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped { "not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped {
val timeoutIdentity = Timeouts.idleTimeoutBidi[Int, Int](2.seconds).join(Flow[Int]) val timeoutIdentity = BidiFlow.bidirectionalIdleTimeout[Int, Int](2.seconds).join(Flow[Int])
Await.result( Await.result(
Source(1 to 100).via(timeoutIdentity).grouped(200).runWith(Sink.head), Source(1 to 100).via(timeoutIdentity).grouped(200).runWith(Sink.head),
@ -146,7 +148,7 @@ class TimeoutsSpec extends AkkaSpec {
val downstream = Flow.fromSinkAndSourceMat(Sink.ignore, Source(downstreamWriter))(Keep.left) val downstream = Flow.fromSinkAndSourceMat(Sink.ignore, Source(downstreamWriter))(Keep.left)
val assembly: RunnableGraph[(Future[Unit], Future[Unit])] = upstream val assembly: RunnableGraph[(Future[Unit], Future[Unit])] = upstream
.joinMat(Timeouts.idleTimeoutBidi[Int, String](2.seconds))(Keep.left) .joinMat(BidiFlow.bidirectionalIdleTimeout[Int, String](2.seconds))(Keep.left)
.joinMat(downstream)(Keep.both) .joinMat(downstream)(Keep.both)
val (upFinished, downFinished) = assembly.run() val (upFinished, downFinished) = assembly.run()
@ -174,7 +176,7 @@ class TimeoutsSpec extends AkkaSpec {
RunnableGraph.fromGraph(FlowGraph.create() { implicit b RunnableGraph.fromGraph(FlowGraph.create() { implicit b
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) val timeoutStage = b.add(BidiFlow.bidirectionalIdleTimeout[String, Int](2.seconds))
Source(upWrite) ~> timeoutStage.in1; Source(upWrite) ~> timeoutStage.in1;
timeoutStage.out1 ~> Sink(downRead) timeoutStage.out1 ~> Sink(downRead)
Sink(upRead) <~ timeoutStage.out2; Sink(upRead) <~ timeoutStage.out2;
@ -222,7 +224,7 @@ class TimeoutsSpec extends AkkaSpec {
RunnableGraph.fromGraph(FlowGraph.create() { implicit b RunnableGraph.fromGraph(FlowGraph.create() { implicit b
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) val timeoutStage = b.add(BidiFlow.bidirectionalIdleTimeout[String, Int](2.seconds))
Source(upWrite) ~> timeoutStage.in1; Source(upWrite) ~> timeoutStage.in1;
timeoutStage.out1 ~> Sink(downRead) timeoutStage.out1 ~> Sink(downRead)
Sink(upRead) <~ timeoutStage.out2; Sink(upRead) <~ timeoutStage.out2;

View file

@ -1,15 +1,16 @@
package akka.stream.io package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException } import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.{ BidiFlow, Flow }
import akka.stream.stage._
import akka.stream.{ BidiShape, Inlet, Outlet, Attributes } import akka.stream.{ BidiShape, Inlet, Outlet, Attributes }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.concurrent.duration.{ Deadline, FiniteDuration }
/** /**
* INTERNAL API
*
* Various stages for controlling timeouts on IO related streams (although not necessarily). * Various stages for controlling timeouts on IO related streams (although not necessarily).
* *
* The common theme among the processing stages here that * The common theme among the processing stages here that
@ -18,41 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration }
* - if the timer fires before the event happens, these stages all fail the stream * - if the timer fires before the event happens, these stages all fail the stream
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/ */
object Timeouts { private[stream] object Timeouts {
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def initalTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.fromGraph(new InitialTimeout[T](timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def completionTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.fromGraph(new CompletionTimeout[T](timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def idleTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.fromGraph(new IdleTimeout[T](timeout))
/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[TimeoutException]].
*
* There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage.
* If the timout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing
* every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers
* the *joint* frequencies of the elements in both directions.
*/
def idleTimeoutBidi[A, B](timeout: FiniteDuration): BidiFlow[A, A, B, B, Unit] =
BidiFlow.fromGraph(new IdleTimeoutBidi[A, B](timeout))
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._ import scala.concurrent.duration._
FiniteDuration( FiniteDuration(
@ -60,8 +27,7 @@ object Timeouts {
TimeUnit.NANOSECONDS) TimeUnit.NANOSECONDS)
} }
private class InitialTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var initialHasPassed = false private var initialHasPassed = false
@ -86,7 +52,7 @@ object Timeouts {
override def toString = "InitialTimeoutTimer" override def toString = "InitialTimeoutTimer"
} }
private class CompletionTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler { setHandler(in, new InHandler {
@ -106,7 +72,7 @@ object Timeouts {
override def toString = "CompletionTimeout" override def toString = "CompletionTimeout"
} }
private class IdleTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
private var nextDeadline: Deadline = Deadline.now + timeout private var nextDeadline: Deadline = Deadline.now + timeout
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
@ -131,7 +97,7 @@ object Timeouts {
override def toString = "IdleTimeout" override def toString = "IdleTimeout"
} }
private class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { final class IdleBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
val in1 = Inlet[I]("in1") val in1 = Inlet[I]("in1")
val in2 = Inlet[O]("in2") val in2 = Inlet[O]("in2")
val out1 = Outlet[I]("out1") val out1 = Outlet[I]("out1")

View file

@ -9,8 +9,7 @@ import akka.actor._
import akka.io.Tcp._ import akka.io.Tcp._
import akka.io.{ IO, Tcp } import akka.io.{ IO, Tcp }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.io.Timeouts import akka.stream.scaladsl.{ Tcp StreamTcp, BidiFlow, Flow }
import akka.stream.scaladsl.{ Flow, Tcp StreamTcp }
import akka.stream.{ ActorMaterializerSettings, BindFailedException, ConnectionException } import akka.stream.{ ActorMaterializerSettings, BindFailedException, ConnectionException }
import akka.util.ByteString import akka.util.ByteString
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
@ -158,7 +157,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
val handler = (idleTimeout match { val handler = (idleTimeout match {
case d: FiniteDuration Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) case d: FiniteDuration Flow[ByteString].join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
case _ Flow[ByteString] case _ Flow[ByteString]
}).via(Flow.fromProcessor(() processor)) }).via(Flow.fromProcessor(() processor))

View file

@ -6,6 +6,8 @@ package akka.stream.javadsl
import akka.japi.function import akka.japi.function
import akka.stream._ import akka.stream._
import scala.concurrent.duration.FiniteDuration
object BidiFlow { object BidiFlow {
/** /**
* A graph with the shape of a BidiFlow logically is a BidiFlow, this method makes * A graph with the shape of a BidiFlow logically is a BidiFlow, this method makes
@ -73,6 +75,17 @@ object BidiFlow {
def fromFunctions[I1, O1, I2, O2](top: function.Function[I1, O1], bottom: function.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = def fromFunctions[I1, O1, I2, O2](top: function.Function[I1, O1], bottom: function.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] =
new BidiFlow(scaladsl.BidiFlow.fromFunctions(top.apply _, bottom.apply _)) new BidiFlow(scaladsl.BidiFlow.fromFunctions(top.apply _, bottom.apply _))
/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing
* every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers
* the *joint* frequencies of the elements in both directions.
*/
def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] =
new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout))
} }
class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] {

View file

@ -996,6 +996,27 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] =
new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF))) new Flow(delegate.zipWithMat[Out2, Out3, M, M2](that)(combinerToScala(combine))(combinerToScala(matF)))
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialTimeout(timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.completionTimeout(timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.idleTimeout(timeout))
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr)) new Flow(delegate.withAttributes(attr))

View file

@ -846,6 +846,27 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] =
new Source(delegate.flatten(strategy)) new Source(delegate.flatten(strategy))
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialTimeout(timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.completionTimeout(timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*/
def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.idleTimeout(timeout))
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr)) new Source(delegate.withAttributes(attr))

View file

@ -5,9 +5,12 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.Timeouts
import scala.concurrent.duration.FiniteDuration
final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] {
override val shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] override def shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]]
/** /**
* Add the given BidiFlow as the next step in a bidirectional transformation * Add the given BidiFlow as the next step in a bidirectional transformation
@ -193,4 +196,16 @@ object BidiFlow {
*/ */
def fromFunctions[I1, O1, I2, O2](outbound: I1 O1, inbound: I2 O2): BidiFlow[I1, O1, I2, O2, Unit] = def fromFunctions[I1, O1, I2, O2](outbound: I1 O1, inbound: I2 O2): BidiFlow[I1, O1, I2, O2, Unit] =
fromFlows(Flow[I1].map(outbound), Flow[I2].map(inbound)) fromFlows(Flow[I1].map(outbound), Flow[I2].map(inbound))
/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*
* There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing
* every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers
* the *joint* frequencies of the elements in both directions.
*/
def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] =
fromGraph(new Timeouts.IdleBidi(timeout))
} }

View file

@ -10,8 +10,7 @@ import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage } import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered }
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout } import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timeouts }
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage }
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -193,7 +192,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
// FIXME: Only exists to keep old stuff alive // FIXME: Only exists to keep old stuff alive
private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () (Processor[O, U], Mat2)): Repr[U, Mat2] = { private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () (Processor[O, U], Mat2)): Repr[U, Mat2] = {
val op = Stages.DirectProcessor(processorFactory.asInstanceOf[() (Processor[Any, Any], Any)]) val op = DirectProcessor(processorFactory.asInstanceOf[() (Processor[Any, Any], Any)])
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]] if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]]
else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
} }
@ -1019,6 +1018,24 @@ trait FlowOps[+Out, +Mat] {
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
} }
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*/
def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Initial[Out](timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*/
def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Completion[Out](timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*/
def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Idle[Out](timeout))
/** /**
* Logs elements flowing through the stream as well as completion and erroring. * Logs elements flowing through the stream as well as completion and erroring.
* *

View file

@ -13,7 +13,6 @@ import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager } import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager }
import akka.stream.io.Timeouts
import akka.util.ByteString import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher, Subscriber } import org.reactivestreams.{ Processor, Publisher, Subscriber }
@ -204,7 +203,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = { idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
val timeoutHandling = idleTimeout match { val timeoutHandling = idleTimeout match {
case d: FiniteDuration Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d)) case d: FiniteDuration Flow[ByteString].join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
case _ Flow[ByteString] case _ Flow[ByteString]
} }