From 12c9abb8c939833f304ae1cdabae810c061dd576 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 11 Sep 2015 15:50:17 +0200 Subject: [PATCH] !str #16410 #16597: Removed TimerTransform, added Timeout stages --- .../scala/akka/http/impl/util/package.scala | 47 ++++ .../akka/http/scaladsl/model/HttpEntity.scala | 31 +-- .../fusing/ActorGraphInterpreterSpec.scala | 46 ++++ .../impl/fusing/GraphInterpreterSpec.scala | 5 +- .../scala/akka/stream/io/TimeoutsSpec.scala | 242 ++++++++++++++++++ .../stream/scaladsl/FlowDropWithinSpec.scala | 10 + .../scaladsl/FlowGroupedWithinSpec.scala | 39 +-- .../scaladsl/FlowTimerTransformerSpec.scala | 87 ------- .../scaladsl/GraphStageTimersSpec.scala | 214 ++++++++++++++++ .../scaladsl/TimerTransformerSpec.scala | 133 ---------- .../main/scala/akka/stream/Materializer.scala | 28 ++ .../scala/akka/stream/TimerTransformer.scala | 131 ---------- .../stream/impl/ActorMaterializerImpl.scala | 10 +- .../main/scala/akka/stream/impl/Stages.scala | 7 +- .../impl/TimerTransformerProcessorsImpl.scala | 98 ------- .../impl/fusing/ActorGraphInterpreter.scala | 25 +- .../stream/impl/fusing/GraphInterpreter.scala | 26 +- .../akka/stream/impl/fusing/GraphStages.scala | 24 +- .../scala/akka/stream/impl/fusing/Ops.scala | 121 +++++++++ .../main/scala/akka/stream/io/Timeouts.scala | 171 +++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 181 ++++--------- .../scala/akka/stream/scaladsl/Graph.scala | 11 + .../scala/akka/stream/scaladsl/Source.scala | 8 - .../scala/akka/stream/stage/GraphStage.scala | 167 +++++++++++- .../main/scala/akka/stream/stage/Stage.scala | 2 +- 25 files changed, 1185 insertions(+), 679 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/TimerTransformerSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/TimerTransformer.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/Timeouts.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 001e9886d8..1737e472d2 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -127,6 +127,53 @@ package object util { package util { + import akka.http.scaladsl.model.{ ContentType, HttpEntity } + import akka.stream.{ Outlet, Inlet, FlowShape } + import scala.concurrent.duration.FiniteDuration + + private[http] class ToStrict(timeout: FiniteDuration, contentType: ContentType) + extends GraphStage[FlowShape[ByteString, HttpEntity.Strict]] { + + val in = Inlet[ByteString]("in") + val out = Outlet[HttpEntity.Strict]("out") + override val shape = FlowShape(in, out) + + override def createLogic: GraphStageLogic = new GraphStageLogic { + var bytes = ByteString.newBuilder + private var emptyStream = false + + scheduleOnce("ToStrictTimeoutTimer", timeout) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (emptyStream) { + push(out, HttpEntity.Strict(contentType, ByteString.empty)) + completeStage() + } else pull(in) + } + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + bytes ++= grab(in) + pull(in) + } + override def onUpstreamFinish(): Unit = { + if (isAvailable(out)) { + push(out, HttpEntity.Strict(contentType, bytes.result())) + completeStage() + } else emptyStream = true + } + }) + + override def onTimer(key: Any): Unit = + failStage(new java.util.concurrent.TimeoutException( + s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")) + } + + override def toString = "ToStrict" + } + private[http] class EventStreamLogger extends Actor with ActorLogging { def receive = { case x ⇒ log.warning(x.toString) } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 8a25563335..537d0868d7 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -13,11 +13,10 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString -import akka.stream.Materializer +import akka.stream.{ ActorMaterializer, Materializer } import akka.stream.scaladsl._ -import akka.stream.io.SynchronousFileSource +import akka.stream.io.{ Timeouts, SynchronousFileSource } import akka.{ japi, stream } -import akka.stream.TimerTransformer import akka.http.scaladsl.util.FastFuture import akka.http.javadsl.{ model ⇒ jm } import akka.http.impl.util.JavaMapping.Implicits._ @@ -55,28 +54,10 @@ sealed trait HttpEntity extends jm.HttpEntity { * Collects all possible parts and returns a potentially future Strict entity for easier processing. * The Future is failed with an TimeoutException if the stream isn't completed after the given timeout. */ - def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = { - def transformer() = - new TimerTransformer[ByteString, HttpEntity.Strict] { - var bytes = ByteString.newBuilder - scheduleOnce("", timeout) - - def onNext(element: ByteString): immutable.Seq[HttpEntity.Strict] = { - bytes ++= element - Nil - } - - override def onTermination(e: Option[Throwable]): immutable.Seq[HttpEntity.Strict] = - HttpEntity.Strict(contentType, bytes.result()) :: Nil - - def onTimer(timerKey: Any): immutable.Seq[HttpEntity.Strict] = - throw new java.util.concurrent.TimeoutException( - s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data") - } - - // TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393 - dataBytes.via(Flow[ByteString].timerTransform(transformer).named("toStrict")).runWith(Sink.head) - } + def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = + dataBytes + .via(new akka.http.impl.util.ToStrict(timeout, contentType)) + .runWith(Sink.head) /** * Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer. diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 4b0c607a3d..c204319cb2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -80,6 +80,52 @@ class ActorGraphInterpreterSpec extends AkkaSpec { } + "be able to interpret and reuse a simple bidi stage" in assertAllStagesStopped { + val identityBidi = new GraphStage[BidiShape[Int, Int, Int, Int]] { + val in1 = Inlet[Int]("in1") + val in2 = Inlet[Int]("in2") + val out1 = Outlet[Int]("out1") + val out2 = Outlet[Int]("out2") + val shape = BidiShape(in1, out1, in2, out2) + + override def createLogic: GraphStageLogic = new GraphStageLogic { + setHandler(in1, new InHandler { + override def onPush(): Unit = push(out1, grab(in1)) + + override def onUpstreamFinish(): Unit = complete(out1) + }) + + setHandler(in2, new InHandler { + override def onPush(): Unit = push(out2, grab(in2)) + + override def onUpstreamFinish(): Unit = complete(out2) + }) + + setHandler(out1, new OutHandler { + override def onPull(): Unit = pull(in1) + + override def onDownstreamFinish(): Unit = cancel(in1) + }) + + setHandler(out2, new OutHandler { + override def onPull(): Unit = pull(in2) + + override def onDownstreamFinish(): Unit = cancel(in2) + }) + } + + override def toString = "IdentityBidi" + } + + val identityBidiF = BidiFlow.wrap(identityBidi) + val identity = (identityBidiF atop identityBidiF atop identityBidiF).join(Flow[Int].map { x ⇒ x }) + + Await.result( + Source(1 to 10).via(identity).grouped(100).runWith(Sink.head), + 3.seconds) should ===(1 to 10) + + } + "be able to interpret and resuse a simple bidi stage" in assertAllStagesStopped { val identityBidi = new GraphStage[BidiShape[Int, Int, Int, Int]] { val in1 = Inlet[Int]("in1") diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 698e7a9c6f..d9c7ac6d9d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -401,7 +401,7 @@ object GraphInterpreterSpec { (Vector.fill(upstreams.size)(null) ++ outs).toArray, (Vector.fill(upstreams.size)(-1) ++ outOwners).toArray) - _interpreter = new GraphInterpreter(assembly, (_, _, _) ⇒ ()) + _interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ⇒ ()) for ((upstream, i) ← upstreams.zipWithIndex) { _interpreter.attachUpstreamBoundary(i, upstream._1) @@ -415,7 +415,8 @@ object GraphInterpreterSpec { } } - def manualInit(assembly: GraphAssembly): Unit = _interpreter = new GraphInterpreter(assembly, (_, _, _) ⇒ ()) + def manualInit(assembly: GraphAssembly): Unit = + _interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ⇒ ()) def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala new file mode 100644 index 0000000000..1a980e2e3e --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TimeoutsSpec.scala @@ -0,0 +1,242 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.stream.io + +import java.util.concurrent.TimeoutException + +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.stream.testkit.{ AkkaSpec, TestSubscriber, TestPublisher } +import scala.concurrent.{ Future, Await } +import scala.concurrent.duration._ + +import akka.stream.testkit.Utils._ + +class TimeoutsSpec extends AkkaSpec { + implicit val mat = ActorMaterializer() + + "InitialTimeout" must { + + "pass through elements unmodified" in assertAllStagesStopped { + Await.result( + Source(1 to 100).via(Timeouts.initalTimeout(2.seconds)).grouped(200).runWith(Sink.head), + 3.seconds) should ===(1 to 100) + } + + "pass through error unmodified" in assertAllStagesStopped { + a[TE] shouldBe thrownBy { + Await.result( + Source(1 to 100).concat(Source.failed(TE("test"))) + .via(Timeouts.initalTimeout(2.seconds)) + .grouped(200).runWith(Sink.head), + 3.seconds) + } + } + + "fail if no initial element passes until timeout" in assertAllStagesStopped { + val downstreamProbe = TestSubscriber.probe[Int]() + Source.lazyEmpty[Int] + .via(Timeouts.initalTimeout(1.seconds)) + .runWith(Sink(downstreamProbe)) + + downstreamProbe.expectSubscription() + downstreamProbe.expectNoMsg(500.millis) + + val ex = downstreamProbe.expectError() + ex.getMessage should ===("The first element has not yet passed through in 1 second.") + } + + } + + "CompletionTimeout" must { + + "pass through elements unmodified" in assertAllStagesStopped { + Await.result( + Source(1 to 100).via(Timeouts.completionTimeout(2.seconds)).grouped(200).runWith(Sink.head), + 3.seconds) should ===(1 to 100) + } + + "pass through error unmodified" in assertAllStagesStopped { + a[TE] shouldBe thrownBy { + Await.result( + Source(1 to 100).concat(Source.failed(TE("test"))) + .via(Timeouts.completionTimeout(2.seconds)) + .grouped(200).runWith(Sink.head), + 3.seconds) + } + } + + "fail if not completed until timeout" in assertAllStagesStopped { + val upstreamProbe = TestPublisher.probe[Int]() + val downstreamProbe = TestSubscriber.probe[Int]() + Source(upstreamProbe) + .via(Timeouts.completionTimeout(2.seconds)) + .runWith(Sink(downstreamProbe)) + + upstreamProbe.sendNext(1) + downstreamProbe.requestNext(1) + downstreamProbe.expectNoMsg(500.millis) // No timeout yet + + upstreamProbe.sendNext(2) + downstreamProbe.requestNext(2) + downstreamProbe.expectNoMsg(500.millis) // No timeout yet + + val ex = downstreamProbe.expectError() + ex.getMessage should ===("The stream has not been completed in 2 seconds.") + } + + } + + "IdleTimeout" must { + + "pass through elements unmodified" in assertAllStagesStopped { + Await.result( + Source(1 to 100).via(Timeouts.idleTimeout(2.seconds)).grouped(200).runWith(Sink.head), + 3.seconds) should ===(1 to 100) + } + + "pass through error unmodified" in assertAllStagesStopped { + a[TE] shouldBe thrownBy { + Await.result( + Source(1 to 100).concat(Source.failed(TE("test"))) + .via(Timeouts.idleTimeout(2.seconds)) + .grouped(200).runWith(Sink.head), + 3.seconds) + } + } + + "fail if time between elements is too large" in assertAllStagesStopped { + val upstreamProbe = TestPublisher.probe[Int]() + val downstreamProbe = TestSubscriber.probe[Int]() + Source(upstreamProbe) + .via(Timeouts.idleTimeout(1.seconds)) + .runWith(Sink(downstreamProbe)) + + // Two seconds in overall, but won't timeout until time between elements is large enough + // (i.e. this works differently from completionTimeout) + for (_ ← 1 to 4) { + upstreamProbe.sendNext(1) + downstreamProbe.requestNext(1) + downstreamProbe.expectNoMsg(500.millis) // No timeout yet + } + + val ex = downstreamProbe.expectError() + ex.getMessage should ===("No elements passed in the last 1 second.") + } + + } + + "IdleTimeoutBidi" must { + + "not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped { + val timeoutIdentity = Timeouts.idleTimeoutBidi[Int, Int](2.seconds).join(Flow[Int]) + + Await.result( + Source(1 to 100).via(timeoutIdentity).grouped(200).runWith(Sink.head), + 3.seconds) should ===(1 to 100) + } + + "not signal error if traffic is one-way" in assertAllStagesStopped { + val upstreamWriter = TestPublisher.probe[Int]() + val downstreamWriter = TestPublisher.probe[String]() + + val upstream = Flow.wrap(Sink.ignore, Source(upstreamWriter))(Keep.left) + val downstream = Flow.wrap(Sink.ignore, Source(downstreamWriter))(Keep.left) + + val assembly: RunnableGraph[(Future[Unit], Future[Unit])] = upstream + .joinMat(Timeouts.idleTimeoutBidi[Int, String](2.seconds))(Keep.left) + .joinMat(downstream)(Keep.both) + + val (upFinished, downFinished) = assembly.run() + + upstreamWriter.sendNext(1) + Thread.sleep(1000) + upstreamWriter.sendNext(1) + Thread.sleep(1000) + upstreamWriter.sendNext(1) + Thread.sleep(1000) + + upstreamWriter.sendComplete() + downstreamWriter.sendComplete() + + Await.ready(upFinished, 3.seconds) + Await.ready(downFinished, 3.seconds) + } + + "be able to signal timeout once no traffic on either sides" in assertAllStagesStopped { + val upWrite = TestPublisher.probe[String]() + val upRead = TestSubscriber.probe[Int]() + + val downWrite = TestPublisher.probe[Int]() + val downRead = TestSubscriber.probe[String]() + + FlowGraph.closed() { implicit b ⇒ + import FlowGraph.Implicits._ + val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) + Source(upWrite) ~> timeoutStage.in1; + timeoutStage.out1 ~> Sink(downRead) + Sink(upRead) <~ timeoutStage.out2; + timeoutStage.in2 <~ Source(downWrite) + }.run() + + // Request enough for the whole test + upRead.request(100) + downRead.request(100) + + upWrite.sendNext("DATA1") + downRead.expectNext("DATA1") + Thread.sleep(1500) + + downWrite.sendNext(1) + upRead.expectNext(1) + Thread.sleep(1500) + + upWrite.sendNext("DATA2") + downRead.expectNext("DATA2") + Thread.sleep(1000) + + downWrite.sendNext(2) + upRead.expectNext(2) + + upRead.expectNoMsg(500.millis) + val error1 = upRead.expectError() + val error2 = downRead.expectError() + + error1.isInstanceOf[TimeoutException] should be(true) + error1.getMessage should ===("No elements passed in the last 2 seconds.") + error2 should ===(error1) + + upWrite.expectCancellation() + downWrite.expectCancellation() + } + + "signal error to all outputs" in assertAllStagesStopped { + val upWrite = TestPublisher.probe[String]() + val upRead = TestSubscriber.probe[Int]() + + val downWrite = TestPublisher.probe[Int]() + val downRead = TestSubscriber.probe[String]() + + FlowGraph.closed() { implicit b ⇒ + import FlowGraph.Implicits._ + val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds)) + Source(upWrite) ~> timeoutStage.in1; + timeoutStage.out1 ~> Sink(downRead) + Sink(upRead) <~ timeoutStage.out2; + timeoutStage.in2 <~ Source(downWrite) + }.run() + + val te = TE("test") + + upWrite.sendError(te) + + upRead.expectSubscriptionAndError(te) + downRead.expectSubscriptionAndError(te) + downWrite.expectCancellation() + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala index 03ea8eba0a..c96e8400be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDropWithinSpec.scala @@ -35,6 +35,16 @@ class FlowDropWithinSpec extends AkkaSpec { c.expectNoMsg(200.millis) } + "deliver completion even before the duration" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).dropWithin(1.day).runWith(Sink(downstream)) + + upstream.sendComplete() + downstream.expectSubscriptionAndComplete() + } + } } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 507dc83622..1057257364 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -100,27 +100,28 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { } "reset time window when max elements reached" in { - val input = Iterator.from(1) - val p = TestPublisher.manualProbe[Int]() - val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() - Source(p).groupedWithin(3, 2.second).to(Sink(c)).run() - val pSub = p.expectSubscription - val cSub = c.expectSubscription - cSub.request(4) - val demand1 = pSub.expectRequest.toInt - demand1 should be(4) - c.expectNoMsg(1000.millis) - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - c.within(1000.millis) { - c.expectNext((1 to 3).toVector) + val inputs = Iterator.from(1) + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + Source(upstream).groupedWithin(3, 2.second).to(Sink(downstream)).run() + + downstream.request(2) + downstream.expectNoMsg(1000.millis) + + (1 to 4) foreach { _ ⇒ upstream.sendNext(inputs.next()) } + downstream.within(1000.millis) { + downstream.expectNext((1 to 3).toVector) } - c.expectNoMsg(1500.millis) - c.within(1000.millis) { - c.expectNext(List(4)) + + downstream.expectNoMsg(1500.millis) + + downstream.within(1000.millis) { + downstream.expectNext(List(4)) } - pSub.sendComplete() - c.expectComplete - c.expectNoMsg(100.millis) + + upstream.sendComplete() + downstream.expectComplete + downstream.expectNoMsg(100.millis) } "group evenly" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala deleted file mode 100644 index 552a1654ed..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace - -import akka.stream.ActorMaterializer -import akka.stream.TimerTransformer - -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ - -class FlowTimerTransformerSpec extends AkkaSpec { - - implicit val materializer = ActorMaterializer() - - "A Flow with TimerTransformer operations" must { - "produce scheduled ticks as expected" in assertAllStagesStopped { - val p = TestPublisher.manualProbe[Int]() - val p2 = Source(p). - timerTransform(() ⇒ new TimerTransformer[Int, Int] { - schedulePeriodically("tick", 100.millis) - var tickCount = 0 - override def onNext(elem: Int) = List(elem) - override def onTimer(timerKey: Any) = { - tickCount += 1 - if (tickCount == 3) cancelTimer("tick") - List(tickCount) - } - override def isComplete: Boolean = !isTimerActive("tick") - }). - runWith(Sink.publisher) - val subscriber = TestSubscriber.manualProbe[Int]() - p2.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(5) - subscriber.expectNext(1) - subscriber.expectNext(2) - subscriber.expectNext(3) - subscriber.expectComplete() - } - - "schedule ticks when last transformation step (consume)" in { - val p = TestPublisher.manualProbe[Int]() - val p2 = Source(p). - timerTransform(() ⇒ new TimerTransformer[Int, Int] { - schedulePeriodically("tick", 100.millis) - var tickCount = 0 - override def onNext(elem: Int) = List(elem) - override def onTimer(timerKey: Any) = { - tickCount += 1 - if (tickCount == 3) cancelTimer("tick") - testActor ! "tick-" + tickCount - List(tickCount) - } - override def isComplete: Boolean = !isTimerActive("tick") - }). - to(Sink.ignore).run() - val pSub = p.expectSubscription() - expectMsg("tick-1") - expectMsg("tick-2") - expectMsg("tick-3") - pSub.sendComplete() - } - - "propagate error if onTimer throws an exception" in assertAllStagesStopped { - val exception = new Exception("Expected exception to the rule") with NoStackTrace - val p = TestPublisher.manualProbe[Int]() - val p2 = Source(p). - timerTransform(() ⇒ new TimerTransformer[Int, Int] { - scheduleOnce("tick", 100.millis) - - def onNext(element: Int) = Nil - override def onTimer(timerKey: Any) = - throw exception - }).runWith(Sink.publisher) - - val subscriber = TestSubscriber.manualProbe[Int]() - p2.subscribe(subscriber) - val subscription = subscriber.expectSubscription() - subscription.request(5) - subscriber.expectError(exception) - } - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala new file mode 100644 index 0000000000..5fb20ff96c --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala @@ -0,0 +1,214 @@ +package akka.stream.scaladsl + +import akka.actor.ActorRef +import akka.stream.ActorMaterializer +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.stage.{ OutHandler, AsyncCallback, InHandler } +import akka.stream.testkit.{ AkkaSpec, TestPublisher } +import akka.testkit.TestDuration + +import scala.concurrent.Promise +import scala.concurrent.duration._ + +import akka.stream.testkit._ +import akka.stream.testkit.Utils._ + +object GraphStageTimersSpec { + case object TestSingleTimer + case object TestSingleTimerResubmit + case object TestCancelTimer + case object TestCancelTimerAck + case object TestRepeatedTimer + case class Tick(n: Int) + + class SideChannel { + @volatile var asyncCallback: AsyncCallback[Any] = _ + @volatile var stopPromise: Promise[Unit] = _ + + def isReady: Boolean = asyncCallback ne null + def !(msg: Any) = asyncCallback.invoke(msg) + + def stopStage(): Unit = stopPromise.trySuccess(()) + } + +} + +class GraphStageTimersSpec extends AkkaSpec { + import GraphStageTimersSpec._ + + implicit val mat = ActorMaterializer() + + class TestStage(probe: ActorRef, sideChannel: SideChannel) extends SimpleLinearGraphStage[Int] { + override def createLogic = new SimpleLinearStageLogic { + val tickCount = Iterator from 1 + + setHandler(in, new InHandler { + override def onPush() = push(out, grab(in)) + }) + + override def preStart() = { + sideChannel.asyncCallback = getAsyncCallback(onTestEvent) + } + + override protected def onTimer(timerKey: Any) = { + val tick = Tick(tickCount.next()) + probe ! tick + if (timerKey == "TestSingleTimerResubmit" && tick.n == 1) + scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + else if (timerKey == "TestRepeatedTimer" && tick.n == 5) + cancelTimer("TestRepeatedTimer") + Nil + } + + private def onTestEvent(event: Any): Unit = event match { + case TestSingleTimer ⇒ + scheduleOnce("TestSingleTimer", 500.millis.dilated) + case TestSingleTimerResubmit ⇒ + scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + case TestCancelTimer ⇒ + scheduleOnce("TestCancelTimer", 1.milli.dilated) + // Likely in mailbox but we cannot guarantee + cancelTimer("TestCancelTimer") + probe ! TestCancelTimerAck + scheduleOnce("TestCancelTimer", 500.milli.dilated) + case TestRepeatedTimer ⇒ + schedulePeriodically("TestRepeatedTimer", 100.millis.dilated) + } + } + } + + "GraphStage timer support" must { + + def setupIsolatedStage: SideChannel = { + val channel = new SideChannel + val stopPromise = Source.lazyEmpty[Int].via(new TestStage(testActor, channel)).to(Sink.ignore).run() + channel.stopPromise = stopPromise + awaitCond(channel.isReady) + channel + } + + "receive single-shot timer" in { + val driver = setupIsolatedStage + + within(2 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimer + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + } + + driver.stopStage() + } + + "resubmit single-shot timer" in { + val driver = setupIsolatedStage + + within(2.5 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimerResubmit + expectMsg(Tick(1)) + } + within(1 second) { + expectMsg(Tick(2)) + } + expectNoMsg(1 second) + } + + driver.stopStage() + } + + "correctly cancel a named timer" in { + val driver = setupIsolatedStage + + driver ! TestCancelTimer + within(500 millis) { + expectMsg(TestCancelTimerAck) + } + within(300 millis, 1 second) { + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + + driver.stopStage() + } + + "receive and cancel a repeated timer" in { + val driver = setupIsolatedStage + + driver ! TestRepeatedTimer + val seq = receiveWhile(2 seconds) { + case t: Tick ⇒ t + } + seq should have length 5 + expectNoMsg(1 second) + + driver.stopStage() + } + + class TestStage2 extends SimpleLinearGraphStage[Int] { + override def createLogic = new SimpleLinearStageLogic { + schedulePeriodically("tick", 100.millis) + var tickCount = 0 + + setHandler(out, new OutHandler { + override def onPull() = () // Do nothing + override def onDownstreamFinish() = completeStage() + }) + + setHandler(in, new InHandler { + override def onPush() = () // Do nothing + override def onUpstreamFinish() = completeStage() + override def onUpstreamFailure(ex: Throwable) = failStage(ex) + }) + + override def onTimer(timerKey: Any) = { + tickCount += 1 + if (isAvailable(out)) push(out, tickCount) + if (tickCount == 3) cancelTimer("tick") + } + + } + } + + "produce scheduled ticks as expected" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).via(new TestStage2).runWith(Sink(downstream)) + + downstream.request(5) + downstream.expectNext(1) + downstream.expectNext(2) + downstream.expectNext(3) + + downstream.expectNoMsg(1.second) + + upstream.sendComplete() + downstream.expectComplete() + } + + "propagate error if onTimer throws an exception" in assertAllStagesStopped { + val exception = TE("Expected exception to the rule") + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).via(new SimpleLinearGraphStage[Int] { + override def createLogic = new SimpleLinearStageLogic { + scheduleOnce("tick", 100.millis) + + setHandler(in, new InHandler { + override def onPush() = () // Ingore + }) + + override def onTimer(timerKey: Any) = throw exception + } + }).runWith(Sink(downstream)) + + downstream.request(1) + downstream.expectError(exception) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TimerTransformerSpec.scala deleted file mode 100644 index 7b5dadb995..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TimerTransformerSpec.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import language.postfixOps -import scala.collection.immutable -import scala.concurrent.duration._ - -import akka.actor.Actor -import akka.actor.ActorCell -import akka.actor.ActorRef -import akka.actor.Props -import akka.stream.TimerTransformer -import akka.stream.TimerTransformer.Scheduled -import akka.stream.testkit.AkkaSpec -import akka.testkit.TestDuration -import akka.testkit.TestKit - -object TimerTransformerSpec { - case object TestSingleTimer - case object TestSingleTimerResubmit - case object TestCancelTimer - case object TestCancelTimerAck - case object TestRepeatedTimer - case class Tick(n: Int) - - def driverProps(probe: ActorRef): Props = - Props(classOf[Driver], probe).withDispatcher("akka.test.stream-dispatcher") - - class Driver(probe: ActorRef) extends Actor { - - // need implicit system for dilated - import context.system - - val tickCount = Iterator from 1 - - val transformer = new TimerTransformer[Int, Int] { - override def onNext(elem: Int): immutable.Seq[Int] = List(elem) - override def onTimer(timerKey: Any): immutable.Seq[Int] = { - val tick = Tick(tickCount.next()) - probe ! tick - if (timerKey == "TestSingleTimerResubmit" && tick.n == 1) - scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) - else if (timerKey == "TestRepeatedTimer" && tick.n == 5) - cancelTimer("TestRepeatedTimer") - Nil - } - } - - override def preStart(): Unit = { - super.preStart() - transformer.start(context) - } - - override def postStop(): Unit = { - super.postStop() - transformer.stop() - } - - def receive = { - case TestSingleTimer ⇒ - transformer.scheduleOnce("TestSingleTimer", 500.millis.dilated) - case TestSingleTimerResubmit ⇒ - transformer.scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) - case TestCancelTimer ⇒ - transformer.scheduleOnce("TestCancelTimer", 1.milli.dilated) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated) - transformer.cancelTimer("TestCancelTimer") - probe ! TestCancelTimerAck - transformer.scheduleOnce("TestCancelTimer", 500.milli.dilated) - case TestRepeatedTimer ⇒ - transformer.schedulePeriodically("TestRepeatedTimer", 100.millis.dilated) - case s: Scheduled ⇒ transformer.onScheduled(s) - } - } -} - -class TimerTransformerSpec extends AkkaSpec { - import TimerTransformerSpec._ - - "A TimerTransformer" must { - - "receive single-shot timer" in { - val driver = system.actorOf(driverProps(testActor)) - within(2 seconds) { - within(500 millis, 1 second) { - driver ! TestSingleTimer - expectMsg(Tick(1)) - } - expectNoMsg(1 second) - } - } - - "resubmit single-shot timer" in { - val driver = system.actorOf(driverProps(testActor)) - within(2.5 seconds) { - within(500 millis, 1 second) { - driver ! TestSingleTimerResubmit - expectMsg(Tick(1)) - } - within(1 second) { - expectMsg(Tick(2)) - } - expectNoMsg(1 second) - } - } - - "correctly cancel a named timer" in { - val driver = system.actorOf(driverProps(testActor)) - driver ! TestCancelTimer - within(500 millis) { - expectMsg(TestCancelTimerAck) - } - within(300 millis, 1 second) { - expectMsg(Tick(1)) - } - expectNoMsg(1 second) - } - - "receive and cancel a repeated timer" in { - val driver = system.actorOf(driverProps(testActor)) - driver ! TestRepeatedTimer - val seq = receiveWhile(2 seconds) { - case t: Tick ⇒ t - } - seq should have length 5 - expectNoMsg(1 second) - } - - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 2f51fad5d3..056dc4f631 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -3,9 +3,13 @@ */ package akka.stream +import akka.actor.Cancellable + import scala.concurrent.ExecutionContextExecutor import akka.japi +import scala.concurrent.duration.FiniteDuration + abstract class Materializer { /** @@ -31,6 +35,24 @@ abstract class Materializer { */ implicit def executionContext: ExecutionContextExecutor + /** + * Interface for stages that need timer services for their functionality. Schedules a + * single task with the given delay. + * + * @return A [[akka.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event + * has been already enqueued it will not have an effect. + */ + def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable + + /** + * Interface for stages that need timer services for their functionality. Schedules a + * repeated task with the given interval between invocations. + * + * @return A [[akka.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event + * has been already enqueued it will not have an effect. + */ + def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable + } /** @@ -43,6 +65,12 @@ private[akka] object NoMaterializer extends Materializer { throw new UnsupportedOperationException("NoMaterializer cannot materialize") override def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException("NoMaterializer does not provide an ExecutionContext") + + def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = + throw new UnsupportedOperationException("NoMaterializer cannot schedule a single event") + + def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = + throw new UnsupportedOperationException("NoMaterializer cannot schedule a repeated event") } /** diff --git a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala deleted file mode 100644 index c87e8addde..0000000000 --- a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import akka.actor.{ ActorContext, Cancellable } -import scala.collection.{ immutable, mutable } -import scala.concurrent.duration.FiniteDuration -import akka.actor.DeadLetterSuppression - -/** - * Transformer with support for scheduling keyed (named) timer events. - */ -// TODO: TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410 -@deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410", "1.0-M1") -private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] { - import TimerTransformer._ - private val timers = mutable.Map[Any, Timer]() - private val timerIdGen = Iterator from 1 - - private var context: Option[ActorContext] = None - // when scheduling before `start` we must queue the operations - private var queued = List.empty[Queued] - - /** - * INTERNAL API - */ - private[akka] final def start(ctx: ActorContext): Unit = { - context = Some(ctx) - queued.reverse.foreach { - case QueuedSchedule(timerKey, interval) ⇒ schedulePeriodically(timerKey, interval) - case QueuedScheduleOnce(timerKey, delay) ⇒ scheduleOnce(timerKey, delay) - case QueuedCancelTimer(timerKey) ⇒ cancelTimer(timerKey) - } - queued = Nil - } - - /** - * INTERNAL API - */ - private[akka] final def stop(): Unit = { - timers.foreach { case (_, Timer(_, task)) ⇒ task.cancel() } - timers.clear() - } - - /** - * Schedule timer to call [[#onTimer]] periodically with the given interval. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = - context match { - case Some(ctx) ⇒ - cancelTimer(timerKey) - val id = timerIdGen.next() - val task = ctx.system.scheduler.schedule(interval, interval, ctx.self, - Scheduled(timerKey, id, repeating = true))(ctx.dispatcher) - timers(timerKey) = Timer(id, task) - case None ⇒ - queued = QueuedSchedule(timerKey, interval) :: queued - } - - /** - * Schedule timer to call [[#onTimer]] after given delay. - * Any existing timer with the same key will automatically be canceled before - * adding the new timer. - */ - def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = - context match { - case Some(ctx) ⇒ - cancelTimer(timerKey) - val id = timerIdGen.next() - val task = ctx.system.scheduler.scheduleOnce(delay, ctx.self, - Scheduled(timerKey, id, repeating = false))(ctx.dispatcher) - timers(timerKey) = Timer(id, task) - case None ⇒ - queued = QueuedScheduleOnce(timerKey, delay) :: queued - } - - /** - * Cancel timer, ensuring that the [[#onTimer]] is not subsequently called. - * @param timerKey key of the timer to cancel - */ - def cancelTimer(timerKey: Any): Unit = - timers.get(timerKey).foreach { t ⇒ - t.task.cancel() - timers -= timerKey - } - - /** - * Inquire whether the timer is still active. Returns true unless the - * timer does not exist, has previously been canceled or if it was a - * single-shot timer that was already triggered. - */ - final def isTimerActive(timerKey: Any): Boolean = timers contains timerKey - - /** - * INTERNAL API - */ - private[akka] def onScheduled(scheduled: Scheduled): immutable.Seq[U] = { - val Id = scheduled.timerId - timers.get(scheduled.timerKey) match { - case Some(Timer(Id, _)) ⇒ - if (!scheduled.repeating) timers -= scheduled.timerKey - onTimer(scheduled.timerKey) - case _ ⇒ Nil // already canceled, or re-scheduled - } - } - - /** - * Will be called when the scheduled timer is triggered. - * @param timerKey key of the scheduled timer - */ - def onTimer(timerKey: Any): immutable.Seq[U] -} - -/** - * INTERNAL API - */ -private object TimerTransformer { - final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression - - sealed trait Queued - final case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued - final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued - final case class QueuedCancelTimer(timerKey: Any) extends Queued - - final case class Timer(id: Int, task: Cancellable) - -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index ff6fa75336..5d3206256e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -21,6 +21,7 @@ import akka.stream.stage.Stage import akka.stream.Attributes._ import org.reactivestreams._ +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } /** @@ -61,6 +62,12 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, } } + override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable) = + system.scheduler.schedule(initialDelay, interval, task)(executionContext) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable) = + system.scheduler.scheduleOnce(delay, task)(executionContext) + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { if (haveShutDown.get()) throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") @@ -114,7 +121,7 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, case graph: GraphModule ⇒ val calculatedSettings = effectiveSettings(effectiveAttributes) - val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings) + val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings, ActorMaterializerImpl.this) val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) { val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) @@ -342,7 +349,6 @@ private[akka] object ActorProcessorFactory { case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) case StageFactory(mkStage, _) ⇒ interp(mkStage()) - case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case MaterializingStageFactory(mkStageAndMat, _) ⇒ val s_m = mkStageAndMat() (ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index b235f90f8f..13491c7514 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.event.LoggingAdapter import akka.stream.impl.SplitDecision.SplitDecision import akka.stream.impl.StreamLayout._ -import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes } +import akka.stream.{ OverflowStrategy, Attributes } import akka.stream.Attributes._ import akka.stream.stage.Stage import org.reactivestreams.Processor @@ -101,11 +101,6 @@ private[stream] object Stages { override def carbonCopy: Module = newInstance } - final case class TimerTransform(mkStage: () ⇒ TimerTransformer[Any, Any], attributes: Attributes = timerTransform) extends StageModule { - def withAttributes(attributes: Attributes) = copy(attributes = attributes) - override protected def newInstance: StageModule = this.copy() - } - final case class StageFactory(mkStage: () ⇒ Stage[_, _], attributes: Attributes = stageFactory) extends StageModule { def withAttributes(attributes: Attributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala deleted file mode 100644 index de00397883..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.ActorMaterializerSettings -import akka.stream.TimerTransformer -import scala.util.control.NonFatal -import akka.actor.{ Deploy, Props } - -private[akka] object TimerTransformerProcessorsImpl { - def props(settings: ActorMaterializerSettings, transformer: TimerTransformer[Any, Any]): Props = - Props(new TimerTransformerProcessorsImpl(settings, transformer)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] class TimerTransformerProcessorsImpl( - _settings: ActorMaterializerSettings, - transformer: TimerTransformer[Any, Any]) - extends ActorProcessorImpl(_settings) with Emit { - import TimerTransformer._ - - var errorEvent: Option[Throwable] = None - - override def preStart(): Unit = { - super.preStart() - initialPhase(1, running) - transformer.start(context) - } - - override def postStop(): Unit = - try { - super.postStop() - transformer.stop() - } finally transformer.cleanup() - - override def onError(e: Throwable): Unit = { - try { - transformer.onError(e) - errorEvent = Some(e) - pump() - } catch { case NonFatal(ex) ⇒ fail(ex) } - } - - val schedulerInputs: Inputs = new DefaultInputTransferStates { - val queue = new java.util.LinkedList[Any] - - override def dequeueInputElement(): Any = queue.removeFirst() - - override def subreceive: SubReceive = new SubReceive({ - case s: Scheduled ⇒ - try { - transformer.onScheduled(s) foreach { elem ⇒ - queue.add(elem) - } - pump() - } catch { case NonFatal(ex) ⇒ pumpFailed(ex) } - }) - - override def cancel(): Unit = () - override def isClosed: Boolean = false - override def inputsDepleted: Boolean = false - override def inputsAvailable: Boolean = !queue.isEmpty - } - - override def activeReceive = super.activeReceive.orElse[Any, Unit](schedulerInputs.subreceive) - - object RunningCondition extends TransferState { - def isReady = { - ((primaryInputs.inputsAvailable || schedulerInputs.inputsAvailable || transformer.isComplete) && - primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted - } - def isCompleted = false - } - - private val terminate = TransferPhase(Always) { () ⇒ - emits = transformer.onTermination(errorEvent) - emitAndThen(completedPhase) - } - - private val running: TransferPhase = TransferPhase(RunningCondition) { () ⇒ - if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) { - nextPhase(terminate) - } else if (schedulerInputs.inputsAvailable) { - emits = List(schedulerInputs.dequeueInputElement()) - emitAndThen(running) - } else { - emits = transformer.onNext(primaryInputs.dequeueInputElement()) - if (transformer.isComplete) emitAndThen(terminate) - else emitAndThen(running) - } - } - - override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 39985f0841..f2924f2e08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -73,8 +73,8 @@ private[stream] object ActorGraphInterpreter { } } - def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings): Props = - Props(new ActorGraphInterpreter(assembly, shape, settings)).withDeploy(Deploy.local) + def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings, mat: Materializer): Props = + Props(new ActorGraphInterpreter(assembly, shape, settings, mat)).withDeploy(Deploy.local) class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] { require(size > 0, "buffer size cannot be zero") @@ -279,10 +279,17 @@ private[stream] object ActorGraphInterpreter { /** * INTERNAL API */ -private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings) extends Actor { +private[stream] class ActorGraphInterpreter( + assembly: GraphAssembly, + shape: Shape, + settings: ActorMaterializerSettings, + mat: Materializer) extends Actor { import ActorGraphInterpreter._ - val interpreter = new GraphInterpreter(assembly, (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler)) + val interpreter = new GraphInterpreter( + assembly, + mat, + (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler)) val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) // Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other @@ -324,8 +331,13 @@ private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shap if (interpreter.isSuspended) runBatch() case AsyncInput(logic, event, handler) ⇒ if (GraphInterpreter.Debug) println(s"ASYNC $event") - if (!interpreter.isStageCompleted(logic.stageId)) - handler(event) + if (!interpreter.isStageCompleted(logic.stageId)) { + try handler(event) + catch { + case NonFatal(e) ⇒ logic.failStage(e) + } + } + runBatch() // Initialization and completion messages @@ -347,6 +359,7 @@ private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shap outputs(id).subscribePending() case ExposedPublisher(id, publisher) ⇒ outputs(id).exposedPublisher(publisher) + } override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 615fa6ea03..00a7d01ef7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -4,7 +4,9 @@ package akka.stream.impl.fusing import akka.stream.stage.{ OutHandler, InHandler, GraphStage, GraphStageLogic } -import akka.stream.{ Shape, Inlet, Outlet } +import akka.stream.{ Materializer, Shape, Inlet, Outlet } + +import scala.util.control.NonFatal /** * INTERNAL API @@ -180,6 +182,7 @@ private[stream] object GraphInterpreter { */ private[stream] final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, + val materializer: Materializer, val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit) { import GraphInterpreter._ @@ -256,6 +259,7 @@ private[stream] final class GraphInterpreter( var i = 0 while (i < logics.length) { logics(i).stageId = i + logics(i).beforePreStart() logics(i).preStart() i += 1 } @@ -267,7 +271,10 @@ private[stream] final class GraphInterpreter( def finish(): Unit = { var i = 0 while (i < logics.length) { - if (!isStageCompleted(i)) logics(i).postStop() + if (!isStageCompleted(i)) { + logics(i).postStop() + logics(i).afterPostStop() + } i += 1 } } @@ -290,7 +297,19 @@ private[stream] final class GraphInterpreter( var eventsRemaining = eventLimit var connection = dequeue() while (eventsRemaining > 0 && connection != NoEvent) { - processEvent(connection) + try processEvent(connection) + catch { + case NonFatal(e) ⇒ + val stageId = connectionStates(connection) match { + case Failed(ex) ⇒ throw new IllegalStateException("Double fault. Failure while handling failure.", e) + case Pushable ⇒ assembly.outOwners(connection) + case Completed ⇒ assembly.inOwners(connection) + case Cancelled ⇒ assembly.outOwners(connection) + case PushCompleted(elem) ⇒ assembly.inOwners(connection) + case pushedElem ⇒ assembly.inOwners(connection) + } + logics(stageId).failStage(e) + } eventsRemaining -= 1 if (eventsRemaining > 0) connection = dequeue() } @@ -392,6 +411,7 @@ private[stream] final class GraphInterpreter( if (activeConnections == 1) { runningStages -= 1 logics(stageId).postStop() + logics(stageId).afterPostStop() } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c9c3f8ebdc..00e544f0e7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -11,23 +11,31 @@ import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } */ object GraphStages { - class Identity[T] extends GraphStage[FlowShape[T, T]] { + /** + * INERNAL API + */ + private[stream] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("in") val out = Outlet[T]("out") - override val shape = FlowShape(in, out) - override def createLogic: GraphStageLogic = new GraphStageLogic { + protected abstract class SimpleLinearStageLogic extends GraphStageLogic { + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(): Unit = completeStage() + }) + } + + } + + class Identity[T] extends SimpleLinearGraphStage[T] { + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) override def onUpstreamFinish(): Unit = completeStage() override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) - - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = completeStage() - }) } override def toString = "Identity" diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 5893adf535..4538f3a957 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,13 +6,16 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.LogLevels +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec import scala.collection.immutable +import scala.collection.immutable.VectorBuilder import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } @@ -726,3 +729,121 @@ private[akka] object Log { private final val OffInt = LogLevels.Off.asInt private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) } + +/** + * INTERNAL API + */ +private[stream] object TimerKeys { + case object TakeWithinTimerKey + case object DropWithinTimerKey + case object GroupedWithinTimerKey +} + +private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { + val in = Inlet[T]("in") + val out = Outlet[immutable.Seq[T]]("out") + val shape = FlowShape(in, out) + + override def createLogic: GraphStageLogic = new GraphStageLogic { + private val buf: VectorBuilder[T] = new VectorBuilder + // True if: + // - buf is nonEmpty + // AND + // - timer fired OR group is full + private var groupClosed = false + private var finished = false + private var elements = 0 + + private val GroupedWithinTimer = "GroupedWithinTimer" + + override def preStart() = { + schedulePeriodically(GroupedWithinTimer, d) + pull(in) + } + + private def nextElement(elem: T): Unit = { + buf += elem + elements += 1 + if (elements == n) { + schedulePeriodically(GroupedWithinTimer, d) + closeGroup() + } else pull(in) + } + + private def closeGroup(): Unit = { + groupClosed = true + if (isAvailable(out)) emitGroup() + } + + private def emitGroup(): Unit = { + push(out, buf.result()) + buf.clear() + if (!finished) startNewGroup() + else completeStage() + } + + private def startNewGroup(): Unit = { + elements = 0 + groupClosed = false + if (isAvailable(in)) nextElement(grab(in)) + else if (!hasBeenPulled(in)) pull(in) + } + + setHandler(in, new InHandler { + override def onPush(): Unit = + if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round + override def onUpstreamFinish(): Unit = { + finished = true + if (!groupClosed && elements > 0) closeGroup() + else completeStage() + } + override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = if (groupClosed) emitGroup() + override def onDownstreamFinish(): Unit = completeStage() + }) + + override protected def onTimer(timerKey: Any) = + if (elements > 0) closeGroup() + } +} + +private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onUpstreamFinish(): Unit = completeStage() + override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) + }) + + final override protected def onTimer(key: Any): Unit = + completeStage() + + scheduleOnce("TakeWithinTimer", timeout) + } + + override def toString = "TakeWithin" +} + +private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + private var allow = false + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + setHandler(in, new InHandler { + override def onPush(): Unit = + if (allow) push(out, grab(in)) + else pull(in) + override def onUpstreamFinish(): Unit = completeStage() + override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) + }) + + final override protected def onTimer(key: Any): Unit = allow = true + + scheduleOnce("DropWithinTimer", timeout) + } + + override def toString = "DropWithin" +} diff --git a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala new file mode 100644 index 0000000000..a373129a4d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala @@ -0,0 +1,171 @@ +package akka.stream.io + +import java.util.concurrent.{ TimeUnit, TimeoutException } + +import akka.actor.{ Cancellable, ActorSystem } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.{ FlowShape, Outlet, Inlet, BidiShape } +import akka.stream.scaladsl.{ BidiFlow, Flow } +import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } + +import scala.concurrent.duration.{ Deadline, FiniteDuration } + +/** + * Various stages for controlling timeouts on IO related streams (although not necessarily). + * + * The common theme among the processing stages here that + * - they wait for certain event or events to happen + * - they have a timer that may fire before these events + * - if the timer fires before the event happens, these stages all fail the stream + * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure + */ +object Timeouts { + + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[TimeoutException]]. + */ + def initalTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = + Flow.wrap(new InitialTimeout[T](timeout)) + + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[TimeoutException]]. + */ + def completionTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = + Flow.wrap(new CompletionTimeout[T](timeout)) + + /** + * If the time between two processed elements exceed the provided timeout, the stream is failed + * with a [[TimeoutException]]. + */ + def idleTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] = + Flow.wrap(new IdleTimeout[T](timeout)) + + /** + * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed + * with a [[TimeoutException]]. + * + * There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage. + * If the timout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing + * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers + * the *joint* frequencies of the elements in both directions. + */ + def idleTimeoutBidi[A, B](timeout: FiniteDuration): BidiFlow[A, A, B, B, Unit] = + BidiFlow.wrap(new IdleTimeoutBidi[A, B](timeout)) + + private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { + import scala.concurrent.duration._ + FiniteDuration( + math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2), + TimeUnit.NANOSECONDS) + } + + private class InitialTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + private var initialHasPassed = false + + setHandler(in, new InHandler { + override def onPush(): Unit = { + initialHasPassed = true + push(out, grab(in)) + } + }) + + final override protected def onTimer(key: Any): Unit = + if (!initialHasPassed) + failStage(new TimeoutException(s"The first element has not yet passed through in $timeout.")) + + scheduleOnce("InitialTimeout", timeout) + } + + override def toString = "InitialTimeoutTimer" + } + + private class CompletionTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + }) + + final override protected def onTimer(key: Any): Unit = + failStage(new TimeoutException(s"The stream has not been completed in $timeout.")) + + scheduleOnce("CompletionTimeoutTimer", timeout) + } + + override def toString = "CompletionTimeout" + } + + private class IdleTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + private var nextDeadline: Deadline = Deadline.now + timeout + + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + setHandler(in, new InHandler { + override def onPush(): Unit = { + nextDeadline = Deadline.now + timeout + push(out, grab(in)) + } + }) + + final override protected def onTimer(key: Any): Unit = + if (nextDeadline.isOverdue()) + failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + + schedulePeriodically("IdleTimeoutCheckTimer", interval = idleTimeoutCheckInterval(timeout)) + } + + override def toString = "IdleTimeout" + } + + private class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { + val in1 = Inlet[I]("in1") + val in2 = Inlet[O]("in2") + val out1 = Outlet[I]("out1") + val out2 = Outlet[O]("out2") + val shape = BidiShape(in1, out1, in2, out2) + + override def toString = "IdleTimeoutBidi" + + override def createLogic: GraphStageLogic = new GraphStageLogic { + private var nextDeadline: Deadline = Deadline.now + timeout + + setHandler(in1, new InHandler { + override def onPush(): Unit = { + onActivity() + push(out1, grab(in1)) + } + override def onUpstreamFinish(): Unit = complete(out1) + }) + + setHandler(in2, new InHandler { + override def onPush(): Unit = { + onActivity() + push(out2, grab(in2)) + } + override def onUpstreamFinish(): Unit = complete(out2) + }) + + setHandler(out1, new OutHandler { + override def onPull(): Unit = pull(in1) + override def onDownstreamFinish(): Unit = cancel(in1) + }) + + setHandler(out2, new OutHandler { + override def onPull(): Unit = pull(in2) + override def onDownstreamFinish(): Unit = cancel(in2) + }) + + private def onActivity(): Unit = nextDeadline = Deadline.now + timeout + + final override def onTimer(key: Any): Unit = + if (nextDeadline.isOverdue()) + failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + + schedulePeriodically("IdleTimeoutCheckTimer", idleTimeoutCheckInterval(timeout)) + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index bcd719fe53..720d970daf 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,6 +9,7 @@ import akka.stream._ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } +import akka.stream.impl.fusing.{ DropWithin, TakeWithin, GroupedWithin } import akka.stream.impl.{ Stages, StreamLayout } import akka.stream.stage._ import akka.util.Collections.EmptyImmutableSeq @@ -32,41 +33,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) private[stream] def isIdentity: Boolean = this.module.isInstanceOf[Stages.Identity] - /** - * Transform this [[Flow]] by appending the given processing steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left) - - /** - * Transform this [[Flow]] by appending the given processing steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { if (this.isIdentity) { val flowInstance: Flow[In, T, Mat2] = if (flow.isInstanceOf[javadsl.Flow[In, T, Mat2]]) @@ -378,12 +344,48 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only. */ trait FlowOps[+Out, +Mat] { - import FlowOps._ import akka.stream.impl.Stages._ type Repr[+O, +M] <: FlowOps[O, M] private final val _identity = (x: Any) ⇒ x + /** + * Transform this [[Flow]] by appending the given processing steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. + */ + def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T, Mat] = viaMat(flow)(Keep.left) + + /** + * Transform this [[Flow]] by appending the given processing steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + */ + def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Repr[T, Mat3] + /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. @@ -650,31 +652,10 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream completes */ - def groupedWithin(n: Int, d: FiniteDuration): Repr[Out, Mat]#Repr[immutable.Seq[Out], Mat] = { + def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out], Mat] = { require(n > 0, "n must be greater than 0") require(d > Duration.Zero) - withAttributes(name("groupedWithin")).timerTransform(() ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { - schedulePeriodically(GroupedWithinTimerKey, d) - var buf: Vector[Out] = Vector.empty - - def onNext(in: Out) = { - buf :+= in - if (buf.size == n) { - // start new time window - schedulePeriodically(GroupedWithinTimerKey, d) - emitGroup() - } else Nil - } - override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) - def onTimer(timerKey: Any) = emitGroup() - private def emitGroup(): immutable.Seq[immutable.Seq[Out]] = - if (buf.isEmpty) EmptyImmutableSeq - else { - val group = buf - buf = Vector.empty - List(group) - } - }) + via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin"))) } /** @@ -702,21 +683,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def dropWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] = - withAttributes(name("dropWithin")).timerTransform(() ⇒ new TimerTransformer[Out, Out] { - scheduleOnce(DropWithinTimerKey, d) - - var delegate: TransformerLike[Out, Out] = - new TransformerLike[Out, Out] { - def onNext(in: Out) = Nil - } - - def onNext(in: Out) = delegate.onNext(in) - def onTimer(timerKey: Any) = { - delegate = FlowOps.identityTransformer[Out] - Nil - } - }) + def dropWithin(d: FiniteDuration): Repr[Out, Mat] = + via(new DropWithin[Out](d).withAttributes(name("dropWithin"))) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -754,19 +722,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels or timer fires */ - def takeWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] = - withAttributes(name("takeWithin")).timerTransform(() ⇒ new TimerTransformer[Out, Out] { - scheduleOnce(TakeWithinTimerKey, d) - - var delegate: TransformerLike[Out, Out] = FlowOps.identityTransformer[Out] - - override def onNext(in: Out) = delegate.onNext(in) - override def isComplete = delegate.isComplete - override def onTimer(timerKey: Any) = { - delegate = FlowOps.completedTransformer[Out] - Nil - } - }) + def takeWithin(d: FiniteDuration): Repr[Out, Mat] = + via(new TakeWithin[Out](d).withAttributes(name("takeWithin"))) /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -1008,35 +965,6 @@ trait FlowOps[+Out, +Mat] { throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } - /** - * INTERNAL API - meant for removal / rewrite. See https://github.com/akka/akka/issues/16393 - * - * Transformation of a stream, with additional support for scheduled events. - * - * For each element the [[akka.stream.TransformerLike#onNext]] - * function is invoked, expecting a (possibly empty) sequence of output elements - * to be produced. - * After handing off the elements produced from one input element to the downstream - * subscribers, the [[akka.stream.TransformerLike#isComplete]] predicate determines whether to end - * stream processing at this point; in that case the upstream subscription is - * canceled. Before signaling normal completion to the downstream subscribers, - * the [[akka.stream.TransformerLike#onTermination]] function is invoked to produce a (possibly empty) - * sequence of elements in response to the end-of-stream event. - * - * [[akka.stream.TransformerLike#onError]] is called when failure is signaled from upstream. - * - * After normal completion or failure the [[akka.stream.TransformerLike#cleanup]] function is called. - * - * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with - * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. - */ - private[akka] def timerTransform[U](mkStage: () ⇒ TimerTransformer[Out, U]): Repr[U, Mat] = - andThen(TimerTransform(mkStage.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) - /** * Logs elements flowing through the stream as well as completion and erroring. * @@ -1065,24 +993,3 @@ trait FlowOps[+Out, +Mat] { private[scaladsl] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2] } -/** - * INTERNAL API - */ -private[stream] object FlowOps { - private case object TakeWithinTimerKey - private case object DropWithinTimerKey - private case object GroupedWithinTimerKey - - private[this] final case object CompletedTransformer extends TransformerLike[Any, Any] { - override def onNext(elem: Any) = Nil - override def isComplete = true - } - - private[this] final case object IdentityTransformer extends TransformerLike[Any, Any] { - override def onNext(elem: Any) = List(elem) - } - - def completedTransformer[T]: TransformerLike[T, T] = CompletedTransformer.asInstanceOf[TransformerLike[T, T]] - def identityTransformer[T]: TransformerLike[T, T] = IdentityTransformer.asInstanceOf[TransformerLike[T, T]] - -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 05c89290ad..0289a1e724 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -572,6 +572,8 @@ object FlowGraph extends GraphApply { } } + // Although Mat is always Unit, it cannot be removed as a type parameter, otherwise the "override type" + // won't work below class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder[_]) extends FlowOps[Out, Mat] with CombinerBase[Out] { override type Repr[+O, +M] = PortOps[O, M] @uncheckedVariance @@ -590,10 +592,19 @@ object FlowGraph extends GraphApply { } override def importAndGetPort(b: Builder[_]): Outlet[Out] = outlet + + override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T, Mat] = + super.~>(flow)(b).asInstanceOf[Repr[T, Mat]] + + override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3) = + throw new UnsupportedOperationException("Cannot use viaMat on a port") } class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) { override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg) + + override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3) = + throw new IllegalArgumentException(msg) } implicit class ReversePortOps[In](val inlet: Inlet[In]) extends ReverseCombinerBase[In] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index f3951a5f1b..3c80dd0a1d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -29,14 +29,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]] - /** - * Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages. - */ - def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left) - - /** - * Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages. - */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { if (flow.module.isInstanceOf[Stages.Identity]) this.asInstanceOf[Source[T, Mat3]] else { diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index f007bdb302..e1a9b6a9f4 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -3,11 +3,15 @@ */ package akka.stream.stage +import akka.actor.{ Cancellable, DeadLetterSuppression } import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.{ GraphModule, GraphInterpreter } import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + /** * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing @@ -51,6 +55,17 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] { } } +private object TimerMessages { + final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression + + sealed trait Queued + final case class QueuedSchedule(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration) extends Queued + final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued + final case class QueuedCancelTimer(timerKey: Any) extends Queued + + final case class Timer(id: Int, task: Cancellable) +} + /** * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a * collection of the following parts: @@ -66,6 +81,19 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] { */ abstract class GraphStageLogic { import GraphInterpreter._ + import TimerMessages._ + + private val keyToTimers = mutable.Map[Any, Timer]() + private val timerIdGen = Iterator from 1 + private var queuedTimerEvents = List.empty[Queued] + + private var _timerAsyncCallback: AsyncCallback[Scheduled] = _ + private def getTimerAsyncCallback: AsyncCallback[Scheduled] = { + if (_timerAsyncCallback eq null) + _timerAsyncCallback = getAsyncCallback(onInternalTimer) + + _timerAsyncCallback + } /** * INTERNAL API @@ -98,11 +126,17 @@ abstract class GraphStageLogic { /** * Assigns callbacks for the events for an [[Inlet]] */ - final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = inHandlers += in -> handler + final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { + handler.ownerStageLogic = this + inHandlers += in -> handler + } /** * Assigns callbacks for the events for an [[Outlet]] */ - final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = outHandlers += out -> handler + final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { + handler.ownerStageLogic = this + outHandlers += out -> handler + } private def conn[T](in: Inlet[T]): Int = inToConn(in) private def conn[T](out: Outlet[T]): Int = outToConn(out) @@ -112,7 +146,7 @@ abstract class GraphStageLogic { * There can only be one outstanding request at any given time. The method [[hasBeenPulled()]] can be used * query whether pull is allowed to be called or not. */ - final def pull[T](in: Inlet[T]): Unit = { + final protected def pull[T](in: Inlet[T]): Unit = { require(!hasBeenPulled(in), "Cannot pull port twice") interpreter.pull(conn(in)) } @@ -120,7 +154,7 @@ abstract class GraphStageLogic { /** * Requests to stop receiving events from a given input port. */ - final def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in)) + final protected def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in)) /** * Once the callback [[InHandler.onPush()]] for an input port has been invoked, the element that has been pushed @@ -129,7 +163,7 @@ abstract class GraphStageLogic { * * The method [[isAvailable()]] can be used to query if the port has an element that can be grabbed or not. */ - final def grab[T](in: Inlet[T]): T = { + final protected def grab[T](in: Inlet[T]): T = { require(isAvailable(in), "Cannot get element from already empty input port") val connection = conn(in) val elem = interpreter.connectionStates(connection) @@ -141,7 +175,7 @@ abstract class GraphStageLogic { * Indicates whether there is already a pending pull for the given input port. If this method returns true * then [[isAvailable()]] must return false for that same port. */ - final def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in)) + final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in)) /** * Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the @@ -149,7 +183,7 @@ abstract class GraphStageLogic { * * If this method returns true then [[hasBeenPulled()]] will return false for that same port. */ - final def isAvailable[T](in: Inlet[T]): Boolean = { + final protected def isAvailable[T](in: Inlet[T]): Boolean = { val connection = conn(in) interpreter.inAvailable(connection) && !(interpreter.connectionStates(connection) == Empty) } @@ -159,7 +193,7 @@ abstract class GraphStageLogic { * will fail. There can be only one outstanding push request at any given time. The method [[isAvailable()]] can be * used to check if the port is ready to be pushed or not. */ - final def push[T](out: Outlet[T], elem: T): Unit = { + final protected def push[T](out: Outlet[T], elem: T): Unit = { require(isAvailable(out), "Cannot push port twice") interpreter.push(conn(out), elem) } @@ -167,12 +201,12 @@ abstract class GraphStageLogic { /** * Signals that there will be no more elements emitted on the given port. */ - final def complete[T](out: Outlet[T]): Unit = interpreter.complete(conn(out)) + final protected def complete[T](out: Outlet[T]): Unit = interpreter.complete(conn(out)) /** * Signals failure through the given port. */ - final def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) + final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) /** * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, @@ -213,6 +247,103 @@ abstract class GraphStageLogic { } } + private def onInternalTimer(scheduled: Scheduled): Unit = { + val Id = scheduled.timerId + keyToTimers.get(scheduled.timerKey) match { + case Some(Timer(Id, _)) ⇒ + if (!scheduled.repeating) keyToTimers -= scheduled.timerKey + onTimer(scheduled.timerKey) + case _ ⇒ + } + } + + /** + * Schedule timer to call [[#onTimer]] periodically with the given interval. + * Any existing timer with the same key will automatically be canceled before + * adding the new timer. + */ + final protected def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = + schedulePeriodicallyWithInitialDelay(timerKey, interval, interval) + + /** + * Schedule timer to call [[#onTimer]] periodically with the given interval after the specified + * initial delay. + * Any existing timer with the same key will automatically be canceled before + * adding the new timer. + */ + final protected def schedulePeriodicallyWithInitialDelay( + timerKey: Any, + initialDelay: FiniteDuration, + interval: FiniteDuration): Unit = { + if (interpreter ne null) { + cancelTimer(timerKey) + val id = timerIdGen.next() + val task = interpreter.materializer.schedulePeriodically(initialDelay, interval, new Runnable { + def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = true)) + }) + keyToTimers(timerKey) = Timer(id, task) + } else { + queuedTimerEvents = QueuedSchedule(timerKey, initialDelay, interval) :: queuedTimerEvents + } + } + + /** + * Schedule timer to call [[#onTimer]] after given delay. + * Any existing timer with the same key will automatically be canceled before + * adding the new timer. + */ + final protected def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = { + if (interpreter ne null) { + cancelTimer(timerKey) + val id = timerIdGen.next() + val task = interpreter.materializer.scheduleOnce(delay, new Runnable { + def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = false)) + }) + keyToTimers(timerKey) = Timer(id, task) + } else { + queuedTimerEvents = QueuedScheduleOnce(timerKey, delay) :: queuedTimerEvents + } + } + + /** + * Cancel timer, ensuring that the [[#onTimer]] is not subsequently called. + * @param timerKey key of the timer to cancel + */ + final protected def cancelTimer(timerKey: Any): Unit = + keyToTimers.get(timerKey).foreach { t ⇒ + t.task.cancel() + keyToTimers -= timerKey + } + + /** + * Inquire whether the timer is still active. Returns true unless the + * timer does not exist, has previously been canceled or if it was a + * single-shot timer that was already triggered. + */ + final protected def isTimerActive(timerKey: Any): Boolean = keyToTimers contains timerKey + + /** + * Will be called when the scheduled timer is triggered. + * @param timerKey key of the scheduled timer + */ + protected def onTimer(timerKey: Any): Unit = () + + // Internal hooks to avoid reliance on user calling super in preStart + protected[stream] def beforePreStart(): Unit = { + queuedTimerEvents.reverse.foreach { + case QueuedSchedule(timerKey, delay, interval) ⇒ schedulePeriodicallyWithInitialDelay(timerKey, delay, interval) + case QueuedScheduleOnce(timerKey, delay) ⇒ scheduleOnce(timerKey, delay) + case QueuedCancelTimer(timerKey) ⇒ cancelTimer(timerKey) + } + queuedTimerEvents = Nil + } + + // Internal hooks to avoid reliance on user calling super in postStop + protected[stream] def afterPostStop(): Unit = { + keyToTimers.foreach { case (_, Timer(_, task)) ⇒ task.cancel() } + keyToTimers.clear() + } + /** * Invoked before any external events are processed, at the startup of the stage. */ @@ -228,6 +359,11 @@ abstract class GraphStageLogic { * Collection of callbacks for an input port of a [[GraphStage]] */ trait InHandler { + /** + * INTERNAL API + */ + private[stream] var ownerStageLogic: GraphStageLogic = _ + /** * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. @@ -237,18 +373,23 @@ trait InHandler { /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ - def onUpstreamFinish(): Unit = () + def onUpstreamFinish(): Unit = ownerStageLogic.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ - def onUpstreamFailure(ex: Throwable): Unit = () + def onUpstreamFailure(ex: Throwable): Unit = ownerStageLogic.failStage(ex) } /** * Collection of callbacks for an output port of a [[GraphStage]] */ trait OutHandler { + /** + * INTERNAL API + */ + private[stream] var ownerStageLogic: GraphStageLogic = _ + /** * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. @@ -259,5 +400,5 @@ trait OutHandler { * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ - def onDownstreamFinish(): Unit = () + def onDownstreamFinish(): Unit = ownerStageLogic.completeStage() } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 34b3e3377b..680964fa92 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -3,7 +3,7 @@ */ package akka.stream.stage -import akka.stream.{ Materializer, Attributes, Supervision } +import akka.stream.{ Attributes, Materializer, Supervision } /** * General interface for stream transformation.