diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index bd2388856e..e335b19d0c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -21,6 +21,7 @@ import akka.actor.ExtendedActorSystem import akka.actor.ActorSystem import akka.actor.Extension import akka.stream.actor.ActorConsumer +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -108,6 +109,11 @@ private[akka] object Ast { name = s"$flowName-0-future"), Some(future)) } } + final case class TickProducerNode[I](interval: FiniteDuration, tick: () ⇒ I) extends ProducerNode[I] { + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = + new ActorProducer(materializer.context.actorOf(TickProducer.props(interval, tick, materializer.settings), + name = s"$flowName-0-tick")) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/TickProducer.scala new file mode 100644 index 0000000000..99a4e53f61 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/TickProducer.scala @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.mutable +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.Subscription +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.stream.MaterializerSettings +import scala.util.control.NonFatal +import akka.actor.Cancellable + +/** + * INTERNAL API + */ +private[akka] object TickProducer { + def props(interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings): Props = + Props(new TickProducer(interval, tick, settings)).withDispatcher(settings.dispatcher) + + object TickProducerSubscription { + case class Cancel(subscriber: Subscriber[Any]) + case class RequestMore(elements: Int, subscriber: Subscriber[Any]) + } + + class TickProducerSubscription(ref: ActorRef, subscriber: Subscriber[Any]) + extends Subscription { + import TickProducerSubscription._ + def cancel(): Unit = ref ! Cancel(subscriber) + def requestMore(elements: Int): Unit = + if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + else ref ! RequestMore(elements, subscriber) + override def toString = "TickProducerSubscription" + } + + private case object Tick +} + +/** + * INTERNAL API + * + * Elements are produced from the tick closure periodically with the specified interval. + * Each subscriber will receive the tick element if it has requested any elements, + * otherwise the tick element is dropped for that subscriber. + */ +private[akka] class TickProducer(interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings) extends Actor with SoftShutdown { + import TickProducer._ + import TickProducer.TickProducerSubscription._ + + var exposedPublisher: ActorPublisher[Any] = _ + val demand = mutable.Map.empty[Subscriber[Any], Long] + + override val supervisorStrategy = SupervisorStrategy.stoppingStrategy + + var tickTask: Option[Cancellable] = None + + def receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) + context.become(waitingForFirstSubscriber) + case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") + } + + def waitingForFirstSubscriber: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + context.setReceiveTimeout(Duration.Undefined) + import context.dispatcher + tickTask = Some(context.system.scheduler.schedule(interval, interval, self, Tick)) + context.become(active) + } + + def active: Receive = { + case Tick ⇒ + ActorBasedFlowMaterializer.withCtx(context) { + try { + val tickElement = tick() + demand foreach { + case (subscriber, d) ⇒ + if (d > 0) { + demand(subscriber) = d - 1 + subscriber.onNext(tickElement) + } + } + } catch { + case NonFatal(e) ⇒ + // tick closure throwed => onError downstream + demand foreach { case (subscriber, _) ⇒ subscriber.onError(e) } + } + } + + case RequestMore(elements, subscriber) ⇒ + demand.get(subscriber) match { + case Some(d) ⇒ demand(subscriber) = d + elements + case None ⇒ // canceled + } + case Cancel ⇒ + softShutdown() + + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + + } + + def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + if (demand.contains(subscriber)) + subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + else { + val subscription = new TickProducerSubscription(self, subscriber) + demand(subscriber) = 0 + subscriber.onSubscribe(subscription) + } + } + + private def unregisterSubscriber(subscriber: Subscriber[Any]): Unit = { + demand -= subscriber + if (demand.isEmpty) { + exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + softShutdown() + } + } + + override def postStop(): Unit = { + tickTask.foreach(_.cancel) + if (exposedPublisher ne null) + exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + } + +} + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8a462ca683..c2bc467d17 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -19,6 +19,7 @@ import akka.japi.Util.immutableSeq import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Flow ⇒ SFlow } import org.reactivestreams.api.Consumer +import scala.concurrent.duration.FiniteDuration /** * Java API @@ -64,6 +65,16 @@ object Flow { */ def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() ⇒ block.call())) + /** + * Elements are produced from the tick `Callable` periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def create[T](interval: FiniteDuration, tick: Callable[T]): Flow[T] = + new FlowAdapter(SFlow.apply(interval, () ⇒ tick.call())) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9a175c1fab..dc63b98614 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -13,6 +13,8 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.Ast.FutureProducerNode import akka.stream.impl.FlowImpl +import akka.stream.impl.Ast.TickProducerNode +import scala.concurrent.duration.FiniteDuration /** * Scala API @@ -59,6 +61,15 @@ object Flow { */ def apply[T](future: Future[T]): Flow[T] = FlowImpl(FutureProducerNode(future), Nil) + /** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def apply[T](interval: FiniteDuration, tick: () ⇒ T): Flow[T] = FlowImpl(TickProducerNode(interval, tick), Nil) + } /** diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index d4f9cfcbfd..92fc309427 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -495,4 +495,26 @@ public class FlowTest { String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } + @Test + public void mustProduceTicks() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Callable tick = new Callable() { + private int count = 1; + + @Override + public String call() { + return "tick-" + (count++); + } + }; + Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), tick).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + probe.expectMsgEquals("tick-1"); + probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + probe.expectMsgEquals("tick-2"); + probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + + } } diff --git a/akka-stream/src/test/scala/akka/stream/TickProducerSpec.scala b/akka-stream/src/test/scala/akka/stream/TickProducerSpec.scala new file mode 100644 index 0000000000..a61a3280e3 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/TickProducerSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import scala.util.control.NoStackTrace + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TickProducerSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + dispatcher = "akka.test.stream-dispatcher")) + + "A Flow based on tick producer" must { + "produce ticks" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.consumerProbe[String] + Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(3) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(200.millis) + c.expectNext("tick-3") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "drop ticks when not requested" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.consumerProbe[String] + Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(2) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(1400.millis) + sub.requestMore(2) + c.expectNext("tick-4") + c.expectNoMsg(200.millis) + c.expectNext("tick-5") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "produce ticks with multiple subscribers" in { + val tickGen = Iterator from 1 + val p = Flow(1.second, () ⇒ "tick-" + tickGen.next()).toProducer(materializer) + val c1 = StreamTestKit.consumerProbe[String] + val c2 = StreamTestKit.consumerProbe[String] + p.produceTo(c1) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext("tick-1") + c2.expectNext("tick-1") + c2.expectNoMsg(200.millis) + c2.expectNext("tick-2") + c1.expectNoMsg(200.millis) + sub1.requestMore(2) + sub2.requestMore(2) + c1.expectNext("tick-3") + c2.expectNext("tick-3") + sub1.cancel() + sub2.cancel() + } + + "signal onError when tick closure throws" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.consumerProbe[String] + Flow(1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(3) + c.expectError.getMessage should be("tick err") + } + + "be usable with zip for a simple form of rate limiting" in { + val c = StreamTestKit.consumerProbe[Int] + val rate = Flow(1.second, () ⇒ "tick").toProducer(materializer) + Flow(1 to 100).zip(rate).map { case (n, _) ⇒ n }.produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(1000) + c.expectNext(1) + c.expectNoMsg(200.millis) + c.expectNext(2) + c.expectNoMsg(200.millis) + sub.cancel() + } + + } +} \ No newline at end of file