!str #16410 #16597: Removed TimerTransform, added Timeout stages

This commit is contained in:
Endre Sándor Varga 2015-09-11 15:50:17 +02:00
parent d5b5e6f22d
commit 12c9abb8c9
25 changed files with 1185 additions and 679 deletions

View file

@ -127,6 +127,53 @@ package object util {
package util {
import akka.http.scaladsl.model.{ ContentType, HttpEntity }
import akka.stream.{ Outlet, Inlet, FlowShape }
import scala.concurrent.duration.FiniteDuration
private[http] class ToStrict(timeout: FiniteDuration, contentType: ContentType)
extends GraphStage[FlowShape[ByteString, HttpEntity.Strict]] {
val in = Inlet[ByteString]("in")
val out = Outlet[HttpEntity.Strict]("out")
override val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
var bytes = ByteString.newBuilder
private var emptyStream = false
scheduleOnce("ToStrictTimeoutTimer", timeout)
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (emptyStream) {
push(out, HttpEntity.Strict(contentType, ByteString.empty))
completeStage()
} else pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
bytes ++= grab(in)
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) {
push(out, HttpEntity.Strict(contentType, bytes.result()))
completeStage()
} else emptyStream = true
}
})
override def onTimer(key: Any): Unit =
failStage(new java.util.concurrent.TimeoutException(
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data"))
}
override def toString = "ToStrict"
}
private[http] class EventStreamLogger extends Actor with ActorLogging {
def receive = { case x log.warning(x.toString) }
}

View file

@ -13,11 +13,10 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.collection.immutable
import akka.util.ByteString
import akka.stream.Materializer
import akka.stream.{ ActorMaterializer, Materializer }
import akka.stream.scaladsl._
import akka.stream.io.SynchronousFileSource
import akka.stream.io.{ Timeouts, SynchronousFileSource }
import akka.{ japi, stream }
import akka.stream.TimerTransformer
import akka.http.scaladsl.util.FastFuture
import akka.http.javadsl.{ model jm }
import akka.http.impl.util.JavaMapping.Implicits._
@ -55,28 +54,10 @@ sealed trait HttpEntity extends jm.HttpEntity {
* Collects all possible parts and returns a potentially future Strict entity for easier processing.
* The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
*/
def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = {
def transformer() =
new TimerTransformer[ByteString, HttpEntity.Strict] {
var bytes = ByteString.newBuilder
scheduleOnce("", timeout)
def onNext(element: ByteString): immutable.Seq[HttpEntity.Strict] = {
bytes ++= element
Nil
}
override def onTermination(e: Option[Throwable]): immutable.Seq[HttpEntity.Strict] =
HttpEntity.Strict(contentType, bytes.result()) :: Nil
def onTimer(timerKey: Any): immutable.Seq[HttpEntity.Strict] =
throw new java.util.concurrent.TimeoutException(
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")
}
// TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393
dataBytes.via(Flow[ByteString].timerTransform(transformer).named("toStrict")).runWith(Sink.head)
}
def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] =
dataBytes
.via(new akka.http.impl.util.ToStrict(timeout, contentType))
.runWith(Sink.head)
/**
* Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer.

View file

@ -80,6 +80,52 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
}
"be able to interpret and reuse a simple bidi stage" in assertAllStagesStopped {
val identityBidi = new GraphStage[BidiShape[Int, Int, Int, Int]] {
val in1 = Inlet[Int]("in1")
val in2 = Inlet[Int]("in2")
val out1 = Outlet[Int]("out1")
val out2 = Outlet[Int]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def createLogic: GraphStageLogic = new GraphStageLogic {
setHandler(in1, new InHandler {
override def onPush(): Unit = push(out1, grab(in1))
override def onUpstreamFinish(): Unit = complete(out1)
})
setHandler(in2, new InHandler {
override def onPush(): Unit = push(out2, grab(in2))
override def onUpstreamFinish(): Unit = complete(out2)
})
setHandler(out1, new OutHandler {
override def onPull(): Unit = pull(in1)
override def onDownstreamFinish(): Unit = cancel(in1)
})
setHandler(out2, new OutHandler {
override def onPull(): Unit = pull(in2)
override def onDownstreamFinish(): Unit = cancel(in2)
})
}
override def toString = "IdentityBidi"
}
val identityBidiF = BidiFlow.wrap(identityBidi)
val identity = (identityBidiF atop identityBidiF atop identityBidiF).join(Flow[Int].map { x x })
Await.result(
Source(1 to 10).via(identity).grouped(100).runWith(Sink.head),
3.seconds) should ===(1 to 10)
}
"be able to interpret and resuse a simple bidi stage" in assertAllStagesStopped {
val identityBidi = new GraphStage[BidiShape[Int, Int, Int, Int]] {
val in1 = Inlet[Int]("in1")

View file

@ -401,7 +401,7 @@ object GraphInterpreterSpec {
(Vector.fill(upstreams.size)(null) ++ outs).toArray,
(Vector.fill(upstreams.size)(-1) ++ outOwners).toArray)
_interpreter = new GraphInterpreter(assembly, (_, _, _) ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ())
for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1)
@ -415,7 +415,8 @@ object GraphInterpreterSpec {
}
}
def manualInit(assembly: GraphAssembly): Unit = _interpreter = new GraphInterpreter(assembly, (_, _, _) ())
def manualInit(assembly: GraphAssembly): Unit =
_interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ())
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq)

View file

@ -0,0 +1,242 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.util.concurrent.TimeoutException
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber, TestPublisher }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import akka.stream.testkit.Utils._
class TimeoutsSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()
"InitialTimeout" must {
"pass through elements unmodified" in assertAllStagesStopped {
Await.result(
Source(1 to 100).via(Timeouts.initalTimeout(2.seconds)).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100)
}
"pass through error unmodified" in assertAllStagesStopped {
a[TE] shouldBe thrownBy {
Await.result(
Source(1 to 100).concat(Source.failed(TE("test")))
.via(Timeouts.initalTimeout(2.seconds))
.grouped(200).runWith(Sink.head),
3.seconds)
}
}
"fail if no initial element passes until timeout" in assertAllStagesStopped {
val downstreamProbe = TestSubscriber.probe[Int]()
Source.lazyEmpty[Int]
.via(Timeouts.initalTimeout(1.seconds))
.runWith(Sink(downstreamProbe))
downstreamProbe.expectSubscription()
downstreamProbe.expectNoMsg(500.millis)
val ex = downstreamProbe.expectError()
ex.getMessage should ===("The first element has not yet passed through in 1 second.")
}
}
"CompletionTimeout" must {
"pass through elements unmodified" in assertAllStagesStopped {
Await.result(
Source(1 to 100).via(Timeouts.completionTimeout(2.seconds)).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100)
}
"pass through error unmodified" in assertAllStagesStopped {
a[TE] shouldBe thrownBy {
Await.result(
Source(1 to 100).concat(Source.failed(TE("test")))
.via(Timeouts.completionTimeout(2.seconds))
.grouped(200).runWith(Sink.head),
3.seconds)
}
}
"fail if not completed until timeout" in assertAllStagesStopped {
val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]()
Source(upstreamProbe)
.via(Timeouts.completionTimeout(2.seconds))
.runWith(Sink(downstreamProbe))
upstreamProbe.sendNext(1)
downstreamProbe.requestNext(1)
downstreamProbe.expectNoMsg(500.millis) // No timeout yet
upstreamProbe.sendNext(2)
downstreamProbe.requestNext(2)
downstreamProbe.expectNoMsg(500.millis) // No timeout yet
val ex = downstreamProbe.expectError()
ex.getMessage should ===("The stream has not been completed in 2 seconds.")
}
}
"IdleTimeout" must {
"pass through elements unmodified" in assertAllStagesStopped {
Await.result(
Source(1 to 100).via(Timeouts.idleTimeout(2.seconds)).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100)
}
"pass through error unmodified" in assertAllStagesStopped {
a[TE] shouldBe thrownBy {
Await.result(
Source(1 to 100).concat(Source.failed(TE("test")))
.via(Timeouts.idleTimeout(2.seconds))
.grouped(200).runWith(Sink.head),
3.seconds)
}
}
"fail if time between elements is too large" in assertAllStagesStopped {
val upstreamProbe = TestPublisher.probe[Int]()
val downstreamProbe = TestSubscriber.probe[Int]()
Source(upstreamProbe)
.via(Timeouts.idleTimeout(1.seconds))
.runWith(Sink(downstreamProbe))
// Two seconds in overall, but won't timeout until time between elements is large enough
// (i.e. this works differently from completionTimeout)
for (_ 1 to 4) {
upstreamProbe.sendNext(1)
downstreamProbe.requestNext(1)
downstreamProbe.expectNoMsg(500.millis) // No timeout yet
}
val ex = downstreamProbe.expectError()
ex.getMessage should ===("No elements passed in the last 1 second.")
}
}
"IdleTimeoutBidi" must {
"not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped {
val timeoutIdentity = Timeouts.idleTimeoutBidi[Int, Int](2.seconds).join(Flow[Int])
Await.result(
Source(1 to 100).via(timeoutIdentity).grouped(200).runWith(Sink.head),
3.seconds) should ===(1 to 100)
}
"not signal error if traffic is one-way" in assertAllStagesStopped {
val upstreamWriter = TestPublisher.probe[Int]()
val downstreamWriter = TestPublisher.probe[String]()
val upstream = Flow.wrap(Sink.ignore, Source(upstreamWriter))(Keep.left)
val downstream = Flow.wrap(Sink.ignore, Source(downstreamWriter))(Keep.left)
val assembly: RunnableGraph[(Future[Unit], Future[Unit])] = upstream
.joinMat(Timeouts.idleTimeoutBidi[Int, String](2.seconds))(Keep.left)
.joinMat(downstream)(Keep.both)
val (upFinished, downFinished) = assembly.run()
upstreamWriter.sendNext(1)
Thread.sleep(1000)
upstreamWriter.sendNext(1)
Thread.sleep(1000)
upstreamWriter.sendNext(1)
Thread.sleep(1000)
upstreamWriter.sendComplete()
downstreamWriter.sendComplete()
Await.ready(upFinished, 3.seconds)
Await.ready(downFinished, 3.seconds)
}
"be able to signal timeout once no traffic on either sides" in assertAllStagesStopped {
val upWrite = TestPublisher.probe[String]()
val upRead = TestSubscriber.probe[Int]()
val downWrite = TestPublisher.probe[Int]()
val downRead = TestSubscriber.probe[String]()
FlowGraph.closed() { implicit b
import FlowGraph.Implicits._
val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds))
Source(upWrite) ~> timeoutStage.in1;
timeoutStage.out1 ~> Sink(downRead)
Sink(upRead) <~ timeoutStage.out2;
timeoutStage.in2 <~ Source(downWrite)
}.run()
// Request enough for the whole test
upRead.request(100)
downRead.request(100)
upWrite.sendNext("DATA1")
downRead.expectNext("DATA1")
Thread.sleep(1500)
downWrite.sendNext(1)
upRead.expectNext(1)
Thread.sleep(1500)
upWrite.sendNext("DATA2")
downRead.expectNext("DATA2")
Thread.sleep(1000)
downWrite.sendNext(2)
upRead.expectNext(2)
upRead.expectNoMsg(500.millis)
val error1 = upRead.expectError()
val error2 = downRead.expectError()
error1.isInstanceOf[TimeoutException] should be(true)
error1.getMessage should ===("No elements passed in the last 2 seconds.")
error2 should ===(error1)
upWrite.expectCancellation()
downWrite.expectCancellation()
}
"signal error to all outputs" in assertAllStagesStopped {
val upWrite = TestPublisher.probe[String]()
val upRead = TestSubscriber.probe[Int]()
val downWrite = TestPublisher.probe[Int]()
val downRead = TestSubscriber.probe[String]()
FlowGraph.closed() { implicit b
import FlowGraph.Implicits._
val timeoutStage = b.add(Timeouts.idleTimeoutBidi[String, Int](2.seconds))
Source(upWrite) ~> timeoutStage.in1;
timeoutStage.out1 ~> Sink(downRead)
Sink(upRead) <~ timeoutStage.out2;
timeoutStage.in2 <~ Source(downWrite)
}.run()
val te = TE("test")
upWrite.sendError(te)
upRead.expectSubscriptionAndError(te)
downRead.expectSubscriptionAndError(te)
downWrite.expectCancellation()
}
}
}

View file

@ -35,6 +35,16 @@ class FlowDropWithinSpec extends AkkaSpec {
c.expectNoMsg(200.millis)
}
"deliver completion even before the duration" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).dropWithin(1.day).runWith(Sink(downstream))
upstream.sendComplete()
downstream.expectSubscriptionAndComplete()
}
}
}

View file

@ -100,27 +100,28 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
}
"reset time window when max elements reached" in {
val input = Iterator.from(1)
val p = TestPublisher.manualProbe[Int]()
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
Source(p).groupedWithin(3, 2.second).to(Sink(c)).run()
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(4)
val demand1 = pSub.expectRequest.toInt
demand1 should be(4)
c.expectNoMsg(1000.millis)
(1 to demand1) foreach { _ pSub.sendNext(input.next()) }
c.within(1000.millis) {
c.expectNext((1 to 3).toVector)
val inputs = Iterator.from(1)
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
Source(upstream).groupedWithin(3, 2.second).to(Sink(downstream)).run()
downstream.request(2)
downstream.expectNoMsg(1000.millis)
(1 to 4) foreach { _ upstream.sendNext(inputs.next()) }
downstream.within(1000.millis) {
downstream.expectNext((1 to 3).toVector)
}
c.expectNoMsg(1500.millis)
c.within(1000.millis) {
c.expectNext(List(4))
downstream.expectNoMsg(1500.millis)
downstream.within(1000.millis) {
downstream.expectNext(List(4))
}
pSub.sendComplete()
c.expectComplete
c.expectNoMsg(100.millis)
upstream.sendComplete()
downstream.expectComplete
downstream.expectNoMsg(100.millis)
}
"group evenly" in {

View file

@ -1,87 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ActorMaterializer
import akka.stream.TimerTransformer
import akka.stream.testkit._
import akka.stream.testkit.Utils._
class FlowTimerTransformerSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"A Flow with TimerTransformer operations" must {
"produce scheduled ticks as expected" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]()
val p2 = Source(p).
timerTransform(() new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis)
var tickCount = 0
override def onNext(elem: Int) = List(elem)
override def onTimer(timerKey: Any) = {
tickCount += 1
if (tickCount == 3) cancelTimer("tick")
List(tickCount)
}
override def isComplete: Boolean = !isTimerActive("tick")
}).
runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(5)
subscriber.expectNext(1)
subscriber.expectNext(2)
subscriber.expectNext(3)
subscriber.expectComplete()
}
"schedule ticks when last transformation step (consume)" in {
val p = TestPublisher.manualProbe[Int]()
val p2 = Source(p).
timerTransform(() new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis)
var tickCount = 0
override def onNext(elem: Int) = List(elem)
override def onTimer(timerKey: Any) = {
tickCount += 1
if (tickCount == 3) cancelTimer("tick")
testActor ! "tick-" + tickCount
List(tickCount)
}
override def isComplete: Boolean = !isTimerActive("tick")
}).
to(Sink.ignore).run()
val pSub = p.expectSubscription()
expectMsg("tick-1")
expectMsg("tick-2")
expectMsg("tick-3")
pSub.sendComplete()
}
"propagate error if onTimer throws an exception" in assertAllStagesStopped {
val exception = new Exception("Expected exception to the rule") with NoStackTrace
val p = TestPublisher.manualProbe[Int]()
val p2 = Source(p).
timerTransform(() new TimerTransformer[Int, Int] {
scheduleOnce("tick", 100.millis)
def onNext(element: Int) = Nil
override def onTimer(timerKey: Any) =
throw exception
}).runWith(Sink.publisher)
val subscriber = TestSubscriber.manualProbe[Int]()
p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(5)
subscriber.expectError(exception)
}
}
}

View file

@ -0,0 +1,214 @@
package akka.stream.scaladsl
import akka.actor.ActorRef
import akka.stream.ActorMaterializer
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage.{ OutHandler, AsyncCallback, InHandler }
import akka.stream.testkit.{ AkkaSpec, TestPublisher }
import akka.testkit.TestDuration
import scala.concurrent.Promise
import scala.concurrent.duration._
import akka.stream.testkit._
import akka.stream.testkit.Utils._
object GraphStageTimersSpec {
case object TestSingleTimer
case object TestSingleTimerResubmit
case object TestCancelTimer
case object TestCancelTimerAck
case object TestRepeatedTimer
case class Tick(n: Int)
class SideChannel {
@volatile var asyncCallback: AsyncCallback[Any] = _
@volatile var stopPromise: Promise[Unit] = _
def isReady: Boolean = asyncCallback ne null
def !(msg: Any) = asyncCallback.invoke(msg)
def stopStage(): Unit = stopPromise.trySuccess(())
}
}
class GraphStageTimersSpec extends AkkaSpec {
import GraphStageTimersSpec._
implicit val mat = ActorMaterializer()
class TestStage(probe: ActorRef, sideChannel: SideChannel) extends SimpleLinearGraphStage[Int] {
override def createLogic = new SimpleLinearStageLogic {
val tickCount = Iterator from 1
setHandler(in, new InHandler {
override def onPush() = push(out, grab(in))
})
override def preStart() = {
sideChannel.asyncCallback = getAsyncCallback(onTestEvent)
}
override protected def onTimer(timerKey: Any) = {
val tick = Tick(tickCount.next())
probe ! tick
if (timerKey == "TestSingleTimerResubmit" && tick.n == 1)
scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
else if (timerKey == "TestRepeatedTimer" && tick.n == 5)
cancelTimer("TestRepeatedTimer")
Nil
}
private def onTestEvent(event: Any): Unit = event match {
case TestSingleTimer
scheduleOnce("TestSingleTimer", 500.millis.dilated)
case TestSingleTimerResubmit
scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
case TestCancelTimer
scheduleOnce("TestCancelTimer", 1.milli.dilated)
// Likely in mailbox but we cannot guarantee
cancelTimer("TestCancelTimer")
probe ! TestCancelTimerAck
scheduleOnce("TestCancelTimer", 500.milli.dilated)
case TestRepeatedTimer
schedulePeriodically("TestRepeatedTimer", 100.millis.dilated)
}
}
}
"GraphStage timer support" must {
def setupIsolatedStage: SideChannel = {
val channel = new SideChannel
val stopPromise = Source.lazyEmpty[Int].via(new TestStage(testActor, channel)).to(Sink.ignore).run()
channel.stopPromise = stopPromise
awaitCond(channel.isReady)
channel
}
"receive single-shot timer" in {
val driver = setupIsolatedStage
within(2 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimer
expectMsg(Tick(1))
}
expectNoMsg(1 second)
}
driver.stopStage()
}
"resubmit single-shot timer" in {
val driver = setupIsolatedStage
within(2.5 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimerResubmit
expectMsg(Tick(1))
}
within(1 second) {
expectMsg(Tick(2))
}
expectNoMsg(1 second)
}
driver.stopStage()
}
"correctly cancel a named timer" in {
val driver = setupIsolatedStage
driver ! TestCancelTimer
within(500 millis) {
expectMsg(TestCancelTimerAck)
}
within(300 millis, 1 second) {
expectMsg(Tick(1))
}
expectNoMsg(1 second)
driver.stopStage()
}
"receive and cancel a repeated timer" in {
val driver = setupIsolatedStage
driver ! TestRepeatedTimer
val seq = receiveWhile(2 seconds) {
case t: Tick t
}
seq should have length 5
expectNoMsg(1 second)
driver.stopStage()
}
class TestStage2 extends SimpleLinearGraphStage[Int] {
override def createLogic = new SimpleLinearStageLogic {
schedulePeriodically("tick", 100.millis)
var tickCount = 0
setHandler(out, new OutHandler {
override def onPull() = () // Do nothing
override def onDownstreamFinish() = completeStage()
})
setHandler(in, new InHandler {
override def onPush() = () // Do nothing
override def onUpstreamFinish() = completeStage()
override def onUpstreamFailure(ex: Throwable) = failStage(ex)
})
override def onTimer(timerKey: Any) = {
tickCount += 1
if (isAvailable(out)) push(out, tickCount)
if (tickCount == 3) cancelTimer("tick")
}
}
}
"produce scheduled ticks as expected" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).via(new TestStage2).runWith(Sink(downstream))
downstream.request(5)
downstream.expectNext(1)
downstream.expectNext(2)
downstream.expectNext(3)
downstream.expectNoMsg(1.second)
upstream.sendComplete()
downstream.expectComplete()
}
"propagate error if onTimer throws an exception" in assertAllStagesStopped {
val exception = TE("Expected exception to the rule")
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).via(new SimpleLinearGraphStage[Int] {
override def createLogic = new SimpleLinearStageLogic {
scheduleOnce("tick", 100.millis)
setHandler(in, new InHandler {
override def onPush() = () // Ingore
})
override def onTimer(timerKey: Any) = throw exception
}
}).runWith(Sink(downstream))
downstream.request(1)
downstream.expectError(exception)
}
}
}

View file

@ -1,133 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorRef
import akka.actor.Props
import akka.stream.TimerTransformer
import akka.stream.TimerTransformer.Scheduled
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestDuration
import akka.testkit.TestKit
object TimerTransformerSpec {
case object TestSingleTimer
case object TestSingleTimerResubmit
case object TestCancelTimer
case object TestCancelTimerAck
case object TestRepeatedTimer
case class Tick(n: Int)
def driverProps(probe: ActorRef): Props =
Props(classOf[Driver], probe).withDispatcher("akka.test.stream-dispatcher")
class Driver(probe: ActorRef) extends Actor {
// need implicit system for dilated
import context.system
val tickCount = Iterator from 1
val transformer = new TimerTransformer[Int, Int] {
override def onNext(elem: Int): immutable.Seq[Int] = List(elem)
override def onTimer(timerKey: Any): immutable.Seq[Int] = {
val tick = Tick(tickCount.next())
probe ! tick
if (timerKey == "TestSingleTimerResubmit" && tick.n == 1)
scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
else if (timerKey == "TestRepeatedTimer" && tick.n == 5)
cancelTimer("TestRepeatedTimer")
Nil
}
}
override def preStart(): Unit = {
super.preStart()
transformer.start(context)
}
override def postStop(): Unit = {
super.postStop()
transformer.stop()
}
def receive = {
case TestSingleTimer
transformer.scheduleOnce("TestSingleTimer", 500.millis.dilated)
case TestSingleTimerResubmit
transformer.scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
case TestCancelTimer
transformer.scheduleOnce("TestCancelTimer", 1.milli.dilated)
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated)
transformer.cancelTimer("TestCancelTimer")
probe ! TestCancelTimerAck
transformer.scheduleOnce("TestCancelTimer", 500.milli.dilated)
case TestRepeatedTimer
transformer.schedulePeriodically("TestRepeatedTimer", 100.millis.dilated)
case s: Scheduled transformer.onScheduled(s)
}
}
}
class TimerTransformerSpec extends AkkaSpec {
import TimerTransformerSpec._
"A TimerTransformer" must {
"receive single-shot timer" in {
val driver = system.actorOf(driverProps(testActor))
within(2 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimer
expectMsg(Tick(1))
}
expectNoMsg(1 second)
}
}
"resubmit single-shot timer" in {
val driver = system.actorOf(driverProps(testActor))
within(2.5 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimerResubmit
expectMsg(Tick(1))
}
within(1 second) {
expectMsg(Tick(2))
}
expectNoMsg(1 second)
}
}
"correctly cancel a named timer" in {
val driver = system.actorOf(driverProps(testActor))
driver ! TestCancelTimer
within(500 millis) {
expectMsg(TestCancelTimerAck)
}
within(300 millis, 1 second) {
expectMsg(Tick(1))
}
expectNoMsg(1 second)
}
"receive and cancel a repeated timer" in {
val driver = system.actorOf(driverProps(testActor))
driver ! TestRepeatedTimer
val seq = receiveWhile(2 seconds) {
case t: Tick t
}
seq should have length 5
expectNoMsg(1 second)
}
}
}

View file

@ -3,9 +3,13 @@
*/
package akka.stream
import akka.actor.Cancellable
import scala.concurrent.ExecutionContextExecutor
import akka.japi
import scala.concurrent.duration.FiniteDuration
abstract class Materializer {
/**
@ -31,6 +35,24 @@ abstract class Materializer {
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Interface for stages that need timer services for their functionality. Schedules a
* single task with the given delay.
*
* @return A [[akka.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event
* has been already enqueued it will not have an effect.
*/
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
/**
* Interface for stages that need timer services for their functionality. Schedules a
* repeated task with the given interval between invocations.
*
* @return A [[akka.actor.Cancellable]] that allows cancelling the timer. Cancelling is best effort, if the event
* has been already enqueued it will not have an effect.
*/
def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable
}
/**
@ -43,6 +65,12 @@ private[akka] object NoMaterializer extends Materializer {
throw new UnsupportedOperationException("NoMaterializer cannot materialize")
override def executionContext: ExecutionContextExecutor =
throw new UnsupportedOperationException("NoMaterializer does not provide an ExecutionContext")
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
throw new UnsupportedOperationException("NoMaterializer cannot schedule a single event")
def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
throw new UnsupportedOperationException("NoMaterializer cannot schedule a repeated event")
}
/**

View file

@ -1,131 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.actor.{ ActorContext, Cancellable }
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
/**
* Transformer with support for scheduling keyed (named) timer events.
*/
// TODO: TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410
@deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410", "1.0-M1")
private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] {
import TimerTransformer._
private val timers = mutable.Map[Any, Timer]()
private val timerIdGen = Iterator from 1
private var context: Option[ActorContext] = None
// when scheduling before `start` we must queue the operations
private var queued = List.empty[Queued]
/**
* INTERNAL API
*/
private[akka] final def start(ctx: ActorContext): Unit = {
context = Some(ctx)
queued.reverse.foreach {
case QueuedSchedule(timerKey, interval) schedulePeriodically(timerKey, interval)
case QueuedScheduleOnce(timerKey, delay) scheduleOnce(timerKey, delay)
case QueuedCancelTimer(timerKey) cancelTimer(timerKey)
}
queued = Nil
}
/**
* INTERNAL API
*/
private[akka] final def stop(): Unit = {
timers.foreach { case (_, Timer(_, task)) task.cancel() }
timers.clear()
}
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit =
context match {
case Some(ctx)
cancelTimer(timerKey)
val id = timerIdGen.next()
val task = ctx.system.scheduler.schedule(interval, interval, ctx.self,
Scheduled(timerKey, id, repeating = true))(ctx.dispatcher)
timers(timerKey) = Timer(id, task)
case None
queued = QueuedSchedule(timerKey, interval) :: queued
}
/**
* Schedule timer to call [[#onTimer]] after given delay.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit =
context match {
case Some(ctx)
cancelTimer(timerKey)
val id = timerIdGen.next()
val task = ctx.system.scheduler.scheduleOnce(delay, ctx.self,
Scheduled(timerKey, id, repeating = false))(ctx.dispatcher)
timers(timerKey) = Timer(id, task)
case None
queued = QueuedScheduleOnce(timerKey, delay) :: queued
}
/**
* Cancel timer, ensuring that the [[#onTimer]] is not subsequently called.
* @param timerKey key of the timer to cancel
*/
def cancelTimer(timerKey: Any): Unit =
timers.get(timerKey).foreach { t
t.task.cancel()
timers -= timerKey
}
/**
* Inquire whether the timer is still active. Returns true unless the
* timer does not exist, has previously been canceled or if it was a
* single-shot timer that was already triggered.
*/
final def isTimerActive(timerKey: Any): Boolean = timers contains timerKey
/**
* INTERNAL API
*/
private[akka] def onScheduled(scheduled: Scheduled): immutable.Seq[U] = {
val Id = scheduled.timerId
timers.get(scheduled.timerKey) match {
case Some(Timer(Id, _))
if (!scheduled.repeating) timers -= scheduled.timerKey
onTimer(scheduled.timerKey)
case _ Nil // already canceled, or re-scheduled
}
}
/**
* Will be called when the scheduled timer is triggered.
* @param timerKey key of the scheduled timer
*/
def onTimer(timerKey: Any): immutable.Seq[U]
}
/**
* INTERNAL API
*/
private object TimerTransformer {
final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression
sealed trait Queued
final case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued
final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued
final case class QueuedCancelTimer(timerKey: Any) extends Queued
final case class Timer(id: Int, task: Cancellable)
}

View file

@ -21,6 +21,7 @@ import akka.stream.stage.Stage
import akka.stream.Attributes._
import org.reactivestreams._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor }
/**
@ -61,6 +62,12 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem,
}
}
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable) =
system.scheduler.schedule(initialDelay, interval, task)(executionContext)
override def scheduleOnce(delay: FiniteDuration, task: Runnable) =
system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
if (haveShutDown.get())
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
@ -114,7 +121,7 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem,
case graph: GraphModule
val calculatedSettings = effectiveSettings(effectiveAttributes)
val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings)
val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings, ActorMaterializerImpl.this)
val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
for ((inlet, i) graph.shape.inlets.iterator.zipWithIndex) {
val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i)
@ -342,7 +349,6 @@ private[akka] object ActorProcessorFactory {
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ())
case StageFactory(mkStage, _) interp(mkStage())
case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())
case MaterializingStageFactory(mkStageAndMat, _)
val s_m = mkStageAndMat()
(ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2)

View file

@ -6,7 +6,7 @@ package akka.stream.impl
import akka.event.LoggingAdapter
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.impl.StreamLayout._
import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes }
import akka.stream.{ OverflowStrategy, Attributes }
import akka.stream.Attributes._
import akka.stream.stage.Stage
import org.reactivestreams.Processor
@ -101,11 +101,6 @@ private[stream] object Stages {
override def carbonCopy: Module = newInstance
}
final case class TimerTransform(mkStage: () TimerTransformer[Any, Any], attributes: Attributes = timerTransform) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
final case class StageFactory(mkStage: () Stage[_, _], attributes: Attributes = stageFactory) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()

View file

@ -1,98 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.ActorMaterializerSettings
import akka.stream.TimerTransformer
import scala.util.control.NonFatal
import akka.actor.{ Deploy, Props }
private[akka] object TimerTransformerProcessorsImpl {
def props(settings: ActorMaterializerSettings, transformer: TimerTransformer[Any, Any]): Props =
Props(new TimerTransformerProcessorsImpl(settings, transformer)).withDeploy(Deploy.local)
}
/**
* INTERNAL API
*/
private[akka] class TimerTransformerProcessorsImpl(
_settings: ActorMaterializerSettings,
transformer: TimerTransformer[Any, Any])
extends ActorProcessorImpl(_settings) with Emit {
import TimerTransformer._
var errorEvent: Option[Throwable] = None
override def preStart(): Unit = {
super.preStart()
initialPhase(1, running)
transformer.start(context)
}
override def postStop(): Unit =
try {
super.postStop()
transformer.stop()
} finally transformer.cleanup()
override def onError(e: Throwable): Unit = {
try {
transformer.onError(e)
errorEvent = Some(e)
pump()
} catch { case NonFatal(ex) fail(ex) }
}
val schedulerInputs: Inputs = new DefaultInputTransferStates {
val queue = new java.util.LinkedList[Any]
override def dequeueInputElement(): Any = queue.removeFirst()
override def subreceive: SubReceive = new SubReceive({
case s: Scheduled
try {
transformer.onScheduled(s) foreach { elem
queue.add(elem)
}
pump()
} catch { case NonFatal(ex) pumpFailed(ex) }
})
override def cancel(): Unit = ()
override def isClosed: Boolean = false
override def inputsDepleted: Boolean = false
override def inputsAvailable: Boolean = !queue.isEmpty
}
override def activeReceive = super.activeReceive.orElse[Any, Unit](schedulerInputs.subreceive)
object RunningCondition extends TransferState {
def isReady = {
((primaryInputs.inputsAvailable || schedulerInputs.inputsAvailable || transformer.isComplete) &&
primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted
}
def isCompleted = false
}
private val terminate = TransferPhase(Always) { ()
emits = transformer.onTermination(errorEvent)
emitAndThen(completedPhase)
}
private val running: TransferPhase = TransferPhase(RunningCondition) { ()
if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) {
nextPhase(terminate)
} else if (schedulerInputs.inputsAvailable) {
emits = List(schedulerInputs.dequeueInputElement())
emitAndThen(running)
} else {
emits = transformer.onNext(primaryInputs.dequeueInputElement())
if (transformer.isComplete) emitAndThen(terminate)
else emitAndThen(running)
}
}
override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)"
}

View file

@ -73,8 +73,8 @@ private[stream] object ActorGraphInterpreter {
}
}
def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings): Props =
Props(new ActorGraphInterpreter(assembly, shape, settings)).withDeploy(Deploy.local)
def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings, mat: Materializer): Props =
Props(new ActorGraphInterpreter(assembly, shape, settings, mat)).withDeploy(Deploy.local)
class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] {
require(size > 0, "buffer size cannot be zero")
@ -279,10 +279,17 @@ private[stream] object ActorGraphInterpreter {
/**
* INTERNAL API
*/
private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings) extends Actor {
private[stream] class ActorGraphInterpreter(
assembly: GraphAssembly,
shape: Shape,
settings: ActorMaterializerSettings,
mat: Materializer) extends Actor {
import ActorGraphInterpreter._
val interpreter = new GraphInterpreter(assembly, (logic, event, handler) self ! AsyncInput(logic, event, handler))
val interpreter = new GraphInterpreter(
assembly,
mat,
(logic, event, handler) self ! AsyncInput(logic, event, handler))
val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
// Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other
@ -324,8 +331,13 @@ private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shap
if (interpreter.isSuspended) runBatch()
case AsyncInput(logic, event, handler)
if (GraphInterpreter.Debug) println(s"ASYNC $event")
if (!interpreter.isStageCompleted(logic.stageId))
handler(event)
if (!interpreter.isStageCompleted(logic.stageId)) {
try handler(event)
catch {
case NonFatal(e) logic.failStage(e)
}
}
runBatch()
// Initialization and completion messages
@ -347,6 +359,7 @@ private[stream] class ActorGraphInterpreter(assembly: GraphAssembly, shape: Shap
outputs(id).subscribePending()
case ExposedPublisher(id, publisher)
outputs(id).exposedPublisher(publisher)
}
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {

View file

@ -4,7 +4,9 @@
package akka.stream.impl.fusing
import akka.stream.stage.{ OutHandler, InHandler, GraphStage, GraphStageLogic }
import akka.stream.{ Shape, Inlet, Outlet }
import akka.stream.{ Materializer, Shape, Inlet, Outlet }
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -180,6 +182,7 @@ private[stream] object GraphInterpreter {
*/
private[stream] final class GraphInterpreter(
private val assembly: GraphInterpreter.GraphAssembly,
val materializer: Materializer,
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit) {
import GraphInterpreter._
@ -256,6 +259,7 @@ private[stream] final class GraphInterpreter(
var i = 0
while (i < logics.length) {
logics(i).stageId = i
logics(i).beforePreStart()
logics(i).preStart()
i += 1
}
@ -267,7 +271,10 @@ private[stream] final class GraphInterpreter(
def finish(): Unit = {
var i = 0
while (i < logics.length) {
if (!isStageCompleted(i)) logics(i).postStop()
if (!isStageCompleted(i)) {
logics(i).postStop()
logics(i).afterPostStop()
}
i += 1
}
}
@ -290,7 +297,19 @@ private[stream] final class GraphInterpreter(
var eventsRemaining = eventLimit
var connection = dequeue()
while (eventsRemaining > 0 && connection != NoEvent) {
processEvent(connection)
try processEvent(connection)
catch {
case NonFatal(e)
val stageId = connectionStates(connection) match {
case Failed(ex) throw new IllegalStateException("Double fault. Failure while handling failure.", e)
case Pushable assembly.outOwners(connection)
case Completed assembly.inOwners(connection)
case Cancelled assembly.outOwners(connection)
case PushCompleted(elem) assembly.inOwners(connection)
case pushedElem assembly.inOwners(connection)
}
logics(stageId).failStage(e)
}
eventsRemaining -= 1
if (eventsRemaining > 0) connection = dequeue()
}
@ -392,6 +411,7 @@ private[stream] final class GraphInterpreter(
if (activeConnections == 1) {
runningStages -= 1
logics(stageId).postStop()
logics(stageId).afterPostStop()
}
}
}

View file

@ -11,23 +11,31 @@ import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
*/
object GraphStages {
class Identity[T] extends GraphStage[FlowShape[T, T]] {
/**
* INERNAL API
*/
private[stream] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("in")
val out = Outlet[T]("out")
override val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
protected abstract class SimpleLinearStageLogic extends GraphStageLogic {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = completeStage()
})
}
}
class Identity[T] extends SimpleLinearGraphStage[T] {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = completeStage()
})
}
override def toString = "Identity"

View file

@ -6,13 +6,16 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.LogLevels
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
@ -726,3 +729,121 @@ private[akka] object Log {
private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
}
/**
* INTERNAL API
*/
private[stream] object TimerKeys {
case object TakeWithinTimerKey
case object DropWithinTimerKey
case object GroupedWithinTimerKey
}
private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
val in = Inlet[T]("in")
val out = Outlet[immutable.Seq[T]]("out")
val shape = FlowShape(in, out)
override def createLogic: GraphStageLogic = new GraphStageLogic {
private val buf: VectorBuilder[T] = new VectorBuilder
// True if:
// - buf is nonEmpty
// AND
// - timer fired OR group is full
private var groupClosed = false
private var finished = false
private var elements = 0
private val GroupedWithinTimer = "GroupedWithinTimer"
override def preStart() = {
schedulePeriodically(GroupedWithinTimer, d)
pull(in)
}
private def nextElement(elem: T): Unit = {
buf += elem
elements += 1
if (elements == n) {
schedulePeriodically(GroupedWithinTimer, d)
closeGroup()
} else pull(in)
}
private def closeGroup(): Unit = {
groupClosed = true
if (isAvailable(out)) emitGroup()
}
private def emitGroup(): Unit = {
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
else completeStage()
}
private def startNewGroup(): Unit = {
elements = 0
groupClosed = false
if (isAvailable(in)) nextElement(grab(in))
else if (!hasBeenPulled(in)) pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit =
if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round
override def onUpstreamFinish(): Unit = {
finished = true
if (!groupClosed && elements > 0) closeGroup()
else completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (groupClosed) emitGroup()
override def onDownstreamFinish(): Unit = completeStage()
})
override protected def onTimer(timerKey: Any) =
if (elements > 0) closeGroup()
}
}
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
final override protected def onTimer(key: Any): Unit =
completeStage()
scheduleOnce("TakeWithinTimer", timeout)
}
override def toString = "TakeWithin"
}
private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
private var allow = false
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
setHandler(in, new InHandler {
override def onPush(): Unit =
if (allow) push(out, grab(in))
else pull(in)
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
final override protected def onTimer(key: Any): Unit = allow = true
scheduleOnce("DropWithinTimer", timeout)
}
override def toString = "DropWithin"
}

View file

@ -0,0 +1,171 @@
package akka.stream.io
import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.actor.{ Cancellable, ActorSystem }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ FlowShape, Outlet, Inlet, BidiShape }
import akka.stream.scaladsl.{ BidiFlow, Flow }
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import scala.concurrent.duration.{ Deadline, FiniteDuration }
/**
* Various stages for controlling timeouts on IO related streams (although not necessarily).
*
* The common theme among the processing stages here that
* - they wait for certain event or events to happen
* - they have a timer that may fire before these events
* - if the timer fires before the event happens, these stages all fail the stream
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/
object Timeouts {
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def initalTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.wrap(new InitialTimeout[T](timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def completionTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.wrap(new CompletionTimeout[T](timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[TimeoutException]].
*/
def idleTimeout[T](timeout: FiniteDuration): Flow[T, T, Unit] =
Flow.wrap(new IdleTimeout[T](timeout))
/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[TimeoutException]].
*
* There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage.
* If the timout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing
* every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers
* the *joint* frequencies of the elements in both directions.
*/
def idleTimeoutBidi[A, B](timeout: FiniteDuration): BidiFlow[A, A, B, B, Unit] =
BidiFlow.wrap(new IdleTimeoutBidi[A, B](timeout))
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._
FiniteDuration(
math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2),
TimeUnit.NANOSECONDS)
}
private class InitialTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
private var initialHasPassed = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
initialHasPassed = true
push(out, grab(in))
}
})
final override protected def onTimer(key: Any): Unit =
if (!initialHasPassed)
failStage(new TimeoutException(s"The first element has not yet passed through in $timeout."))
scheduleOnce("InitialTimeout", timeout)
}
override def toString = "InitialTimeoutTimer"
}
private class CompletionTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
})
final override protected def onTimer(key: Any): Unit =
failStage(new TimeoutException(s"The stream has not been completed in $timeout."))
scheduleOnce("CompletionTimeoutTimer", timeout)
}
override def toString = "CompletionTimeout"
}
private class IdleTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
private var nextDeadline: Deadline = Deadline.now + timeout
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
setHandler(in, new InHandler {
override def onPush(): Unit = {
nextDeadline = Deadline.now + timeout
push(out, grab(in))
}
})
final override protected def onTimer(key: Any): Unit =
if (nextDeadline.isOverdue())
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
schedulePeriodically("IdleTimeoutCheckTimer", interval = idleTimeoutCheckInterval(timeout))
}
override def toString = "IdleTimeout"
}
private class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
val in1 = Inlet[I]("in1")
val in2 = Inlet[O]("in2")
val out1 = Outlet[I]("out1")
val out2 = Outlet[O]("out2")
val shape = BidiShape(in1, out1, in2, out2)
override def toString = "IdleTimeoutBidi"
override def createLogic: GraphStageLogic = new GraphStageLogic {
private var nextDeadline: Deadline = Deadline.now + timeout
setHandler(in1, new InHandler {
override def onPush(): Unit = {
onActivity()
push(out1, grab(in1))
}
override def onUpstreamFinish(): Unit = complete(out1)
})
setHandler(in2, new InHandler {
override def onPush(): Unit = {
onActivity()
push(out2, grab(in2))
}
override def onUpstreamFinish(): Unit = complete(out2)
})
setHandler(out1, new OutHandler {
override def onPull(): Unit = pull(in1)
override def onDownstreamFinish(): Unit = cancel(in1)
})
setHandler(out2, new OutHandler {
override def onPull(): Unit = pull(in2)
override def onDownstreamFinish(): Unit = cancel(in2)
})
private def onActivity(): Unit = nextDeadline = Deadline.now + timeout
final override def onTimer(key: Any): Unit =
if (nextDeadline.isOverdue())
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
schedulePeriodically("IdleTimeoutCheckTimer", idleTimeoutCheckInterval(timeout))
}
}
}

View file

@ -9,6 +9,7 @@ import akka.stream._
import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, TakeWithin, GroupedWithin }
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.stage._
import akka.util.Collections.EmptyImmutableSeq
@ -32,41 +33,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
private[stream] def isIdentity: Boolean = this.module.isInstanceOf[Stages.Identity]
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left)
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) {
val flowInstance: Flow[In, T, Mat2] = if (flow.isInstanceOf[javadsl.Flow[In, T, Mat2]])
@ -378,12 +344,48 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module)
* Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only.
*/
trait FlowOps[+Out, +Mat] {
import FlowOps._
import akka.stream.impl.Stages._
type Repr[+O, +M] <: FlowOps[O, M]
private final val _identity = (x: Any) x
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T, Mat] = viaMat(flow)(Keep.left)
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Repr[T, Mat3]
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
@ -650,31 +652,10 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream completes
*/
def groupedWithin(n: Int, d: FiniteDuration): Repr[Out, Mat]#Repr[immutable.Seq[Out], Mat] = {
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out], Mat] = {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
withAttributes(name("groupedWithin")).timerTransform(() new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty
def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
// start new time window
schedulePeriodically(GroupedWithinTimerKey, d)
emitGroup()
} else Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
def onTimer(timerKey: Any) = emitGroup()
private def emitGroup(): immutable.Seq[immutable.Seq[Out]] =
if (buf.isEmpty) EmptyImmutableSeq
else {
val group = buf
buf = Vector.empty
List(group)
}
})
via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin")))
}
/**
@ -702,21 +683,8 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] =
withAttributes(name("dropWithin")).timerTransform(() new TimerTransformer[Out, Out] {
scheduleOnce(DropWithinTimerKey, d)
var delegate: TransformerLike[Out, Out] =
new TransformerLike[Out, Out] {
def onNext(in: Out) = Nil
}
def onNext(in: Out) = delegate.onNext(in)
def onTimer(timerKey: Any) = {
delegate = FlowOps.identityTransformer[Out]
Nil
}
})
def dropWithin(d: FiniteDuration): Repr[Out, Mat] =
via(new DropWithin[Out](d).withAttributes(name("dropWithin")))
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -754,19 +722,8 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] =
withAttributes(name("takeWithin")).timerTransform(() new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d)
var delegate: TransformerLike[Out, Out] = FlowOps.identityTransformer[Out]
override def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
override def onTimer(timerKey: Any) = {
delegate = FlowOps.completedTransformer[Out]
Nil
}
})
def takeWithin(d: FiniteDuration): Repr[Out, Mat] =
via(new TakeWithin[Out](d).withAttributes(name("takeWithin")))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -1008,35 +965,6 @@ trait FlowOps[+Out, +Mat] {
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
}
/**
* INTERNAL API - meant for removal / rewrite. See https://github.com/akka/akka/issues/16393
*
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.TransformerLike#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.TransformerLike#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.TransformerLike#onTermination]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.TransformerLike#onError]] is called when failure is signaled from upstream.
*
* After normal completion or failure the [[akka.stream.TransformerLike#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
private[akka] def timerTransform[U](mkStage: () TimerTransformer[Out, U]): Repr[U, Mat] =
andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]]))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
@ -1065,24 +993,3 @@ trait FlowOps[+Out, +Mat] {
private[scaladsl] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2]
}
/**
* INTERNAL API
*/
private[stream] object FlowOps {
private case object TakeWithinTimerKey
private case object DropWithinTimerKey
private case object GroupedWithinTimerKey
private[this] final case object CompletedTransformer extends TransformerLike[Any, Any] {
override def onNext(elem: Any) = Nil
override def isComplete = true
}
private[this] final case object IdentityTransformer extends TransformerLike[Any, Any] {
override def onNext(elem: Any) = List(elem)
}
def completedTransformer[T]: TransformerLike[T, T] = CompletedTransformer.asInstanceOf[TransformerLike[T, T]]
def identityTransformer[T]: TransformerLike[T, T] = IdentityTransformer.asInstanceOf[TransformerLike[T, T]]
}

View file

@ -572,6 +572,8 @@ object FlowGraph extends GraphApply {
}
}
// Although Mat is always Unit, it cannot be removed as a type parameter, otherwise the "override type"
// won't work below
class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder[_]) extends FlowOps[Out, Mat] with CombinerBase[Out] {
override type Repr[+O, +M] = PortOps[O, M] @uncheckedVariance
@ -590,10 +592,19 @@ object FlowGraph extends GraphApply {
}
override def importAndGetPort(b: Builder[_]): Outlet[Out] = outlet
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T, Mat] =
super.~>(flow)(b).asInstanceOf[Repr[T, Mat]]
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3) =
throw new UnsupportedOperationException("Cannot use viaMat on a port")
}
class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) {
override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3) =
throw new IllegalArgumentException(msg)
}
implicit class ReversePortOps[In](val inlet: Inlet[In]) extends ReverseCombinerBase[In] {

View file

@ -29,14 +29,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]]
/**
* Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages.
*/
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Source[T, Mat] = viaMat(flow)(Keep.left)
/**
* Transform this [[akka.stream.scaladsl.Source]] by appending the given processing stages.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {
if (flow.module.isInstanceOf[Stages.Identity]) this.asInstanceOf[Source[T, Mat3]]
else {

View file

@ -3,11 +3,15 @@
*/
package akka.stream.stage
import akka.actor.{ Cancellable, DeadLetterSuppression }
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.{ GraphModule, GraphInterpreter }
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
/**
* A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
* its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
@ -51,6 +55,17 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] {
}
}
private object TimerMessages {
final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression
sealed trait Queued
final case class QueuedSchedule(timerKey: Any, initialDelay: FiniteDuration, interval: FiniteDuration) extends Queued
final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued
final case class QueuedCancelTimer(timerKey: Any) extends Queued
final case class Timer(id: Int, task: Cancellable)
}
/**
* Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a
* collection of the following parts:
@ -66,6 +81,19 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] {
*/
abstract class GraphStageLogic {
import GraphInterpreter._
import TimerMessages._
private val keyToTimers = mutable.Map[Any, Timer]()
private val timerIdGen = Iterator from 1
private var queuedTimerEvents = List.empty[Queued]
private var _timerAsyncCallback: AsyncCallback[Scheduled] = _
private def getTimerAsyncCallback: AsyncCallback[Scheduled] = {
if (_timerAsyncCallback eq null)
_timerAsyncCallback = getAsyncCallback(onInternalTimer)
_timerAsyncCallback
}
/**
* INTERNAL API
@ -98,11 +126,17 @@ abstract class GraphStageLogic {
/**
* Assigns callbacks for the events for an [[Inlet]]
*/
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = inHandlers += in -> handler
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
handler.ownerStageLogic = this
inHandlers += in -> handler
}
/**
* Assigns callbacks for the events for an [[Outlet]]
*/
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = outHandlers += out -> handler
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
handler.ownerStageLogic = this
outHandlers += out -> handler
}
private def conn[T](in: Inlet[T]): Int = inToConn(in)
private def conn[T](out: Outlet[T]): Int = outToConn(out)
@ -112,7 +146,7 @@ abstract class GraphStageLogic {
* There can only be one outstanding request at any given time. The method [[hasBeenPulled()]] can be used
* query whether pull is allowed to be called or not.
*/
final def pull[T](in: Inlet[T]): Unit = {
final protected def pull[T](in: Inlet[T]): Unit = {
require(!hasBeenPulled(in), "Cannot pull port twice")
interpreter.pull(conn(in))
}
@ -120,7 +154,7 @@ abstract class GraphStageLogic {
/**
* Requests to stop receiving events from a given input port.
*/
final def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in))
final protected def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in))
/**
* Once the callback [[InHandler.onPush()]] for an input port has been invoked, the element that has been pushed
@ -129,7 +163,7 @@ abstract class GraphStageLogic {
*
* The method [[isAvailable()]] can be used to query if the port has an element that can be grabbed or not.
*/
final def grab[T](in: Inlet[T]): T = {
final protected def grab[T](in: Inlet[T]): T = {
require(isAvailable(in), "Cannot get element from already empty input port")
val connection = conn(in)
val elem = interpreter.connectionStates(connection)
@ -141,7 +175,7 @@ abstract class GraphStageLogic {
* Indicates whether there is already a pending pull for the given input port. If this method returns true
* then [[isAvailable()]] must return false for that same port.
*/
final def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in))
final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in))
/**
* Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the
@ -149,7 +183,7 @@ abstract class GraphStageLogic {
*
* If this method returns true then [[hasBeenPulled()]] will return false for that same port.
*/
final def isAvailable[T](in: Inlet[T]): Boolean = {
final protected def isAvailable[T](in: Inlet[T]): Boolean = {
val connection = conn(in)
interpreter.inAvailable(connection) && !(interpreter.connectionStates(connection) == Empty)
}
@ -159,7 +193,7 @@ abstract class GraphStageLogic {
* will fail. There can be only one outstanding push request at any given time. The method [[isAvailable()]] can be
* used to check if the port is ready to be pushed or not.
*/
final def push[T](out: Outlet[T], elem: T): Unit = {
final protected def push[T](out: Outlet[T], elem: T): Unit = {
require(isAvailable(out), "Cannot push port twice")
interpreter.push(conn(out), elem)
}
@ -167,12 +201,12 @@ abstract class GraphStageLogic {
/**
* Signals that there will be no more elements emitted on the given port.
*/
final def complete[T](out: Outlet[T]): Unit = interpreter.complete(conn(out))
final protected def complete[T](out: Outlet[T]): Unit = interpreter.complete(conn(out))
/**
* Signals failure through the given port.
*/
final def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex)
final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex)
/**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
@ -213,6 +247,103 @@ abstract class GraphStageLogic {
}
}
private def onInternalTimer(scheduled: Scheduled): Unit = {
val Id = scheduled.timerId
keyToTimers.get(scheduled.timerKey) match {
case Some(Timer(Id, _))
if (!scheduled.repeating) keyToTimers -= scheduled.timerKey
onTimer(scheduled.timerKey)
case _
}
}
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
final protected def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit =
schedulePeriodicallyWithInitialDelay(timerKey, interval, interval)
/**
* Schedule timer to call [[#onTimer]] periodically with the given interval after the specified
* initial delay.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
final protected def schedulePeriodicallyWithInitialDelay(
timerKey: Any,
initialDelay: FiniteDuration,
interval: FiniteDuration): Unit = {
if (interpreter ne null) {
cancelTimer(timerKey)
val id = timerIdGen.next()
val task = interpreter.materializer.schedulePeriodically(initialDelay, interval, new Runnable {
def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = true))
})
keyToTimers(timerKey) = Timer(id, task)
} else {
queuedTimerEvents = QueuedSchedule(timerKey, initialDelay, interval) :: queuedTimerEvents
}
}
/**
* Schedule timer to call [[#onTimer]] after given delay.
* Any existing timer with the same key will automatically be canceled before
* adding the new timer.
*/
final protected def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = {
if (interpreter ne null) {
cancelTimer(timerKey)
val id = timerIdGen.next()
val task = interpreter.materializer.scheduleOnce(delay, new Runnable {
def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = false))
})
keyToTimers(timerKey) = Timer(id, task)
} else {
queuedTimerEvents = QueuedScheduleOnce(timerKey, delay) :: queuedTimerEvents
}
}
/**
* Cancel timer, ensuring that the [[#onTimer]] is not subsequently called.
* @param timerKey key of the timer to cancel
*/
final protected def cancelTimer(timerKey: Any): Unit =
keyToTimers.get(timerKey).foreach { t
t.task.cancel()
keyToTimers -= timerKey
}
/**
* Inquire whether the timer is still active. Returns true unless the
* timer does not exist, has previously been canceled or if it was a
* single-shot timer that was already triggered.
*/
final protected def isTimerActive(timerKey: Any): Boolean = keyToTimers contains timerKey
/**
* Will be called when the scheduled timer is triggered.
* @param timerKey key of the scheduled timer
*/
protected def onTimer(timerKey: Any): Unit = ()
// Internal hooks to avoid reliance on user calling super in preStart
protected[stream] def beforePreStart(): Unit = {
queuedTimerEvents.reverse.foreach {
case QueuedSchedule(timerKey, delay, interval) schedulePeriodicallyWithInitialDelay(timerKey, delay, interval)
case QueuedScheduleOnce(timerKey, delay) scheduleOnce(timerKey, delay)
case QueuedCancelTimer(timerKey) cancelTimer(timerKey)
}
queuedTimerEvents = Nil
}
// Internal hooks to avoid reliance on user calling super in postStop
protected[stream] def afterPostStop(): Unit = {
keyToTimers.foreach { case (_, Timer(_, task)) task.cancel() }
keyToTimers.clear()
}
/**
* Invoked before any external events are processed, at the startup of the stage.
*/
@ -228,6 +359,11 @@ abstract class GraphStageLogic {
* Collection of callbacks for an input port of a [[GraphStage]]
*/
trait InHandler {
/**
* INTERNAL API
*/
private[stream] var ownerStageLogic: GraphStageLogic = _
/**
* Called when the input port has a new element available. The actual element can be retrieved via the
* [[GraphStageLogic.grab()]] method.
@ -237,18 +373,23 @@ trait InHandler {
/**
* Called when the input port is finished. After this callback no other callbacks will be called for this port.
*/
def onUpstreamFinish(): Unit = ()
def onUpstreamFinish(): Unit = ownerStageLogic.completeStage()
/**
* Called when the input port has failed. After this callback no other callbacks will be called for this port.
*/
def onUpstreamFailure(ex: Throwable): Unit = ()
def onUpstreamFailure(ex: Throwable): Unit = ownerStageLogic.failStage(ex)
}
/**
* Collection of callbacks for an output port of a [[GraphStage]]
*/
trait OutHandler {
/**
* INTERNAL API
*/
private[stream] var ownerStageLogic: GraphStageLogic = _
/**
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
* is now allowed to be called on this port.
@ -259,5 +400,5 @@ trait OutHandler {
* Called when the output port will no longer accept any new elements. After this callback no other callbacks will
* be called for this port.
*/
def onDownstreamFinish(): Unit = ()
def onDownstreamFinish(): Unit = ownerStageLogic.completeStage()
}

View file

@ -3,7 +3,7 @@
*/
package akka.stream.stage
import akka.stream.{ Materializer, Attributes, Supervision }
import akka.stream.{ Attributes, Materializer, Supervision }
/**
* General interface for stream transformation.