diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 68b9c7def7..91b3f5145e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -111,9 +111,9 @@ private[akka] object Ast { name = s"$flowName-0-future"), Some(future)) } } - final case class TickPublisherNode[I](interval: FiniteDuration, tick: () ⇒ I) extends PublisherNode[I] { + final case class TickPublisherNode[I](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ I) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - ActorPublisher[I](materializer.context.actorOf(TickPublisher.props(interval, tick, materializer.settings), + ActorPublisher[I](materializer.context.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), name = s"$flowName-0-tick")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index cc0e23fba5..79a15831b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -15,8 +15,8 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] object TickPublisher { - def props(interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings): Props = - Props(new TickPublisher(interval, tick, settings)).withDispatcher(settings.dispatcher) + def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings): Props = + Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher) object TickPublisherSubscription { case class Cancel(subscriber: Subscriber[Any]) @@ -42,7 +42,7 @@ private[akka] object TickPublisher { * Each subscriber will receive the tick element if it has requested any elements, * otherwise the tick element is dropped for that subscriber. */ -private[akka] class TickPublisher(interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings) extends Actor with SoftShutdown { +private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings) extends Actor with SoftShutdown { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ @@ -66,7 +66,7 @@ private[akka] class TickPublisher(interval: FiniteDuration, tick: () ⇒ Any, se exposedPublisher.takePendingSubscribers() foreach registerSubscriber context.setReceiveTimeout(Duration.Undefined) import context.dispatcher - tickTask = Some(context.system.scheduler.schedule(interval, interval, self, Tick)) + tickTask = Some(context.system.scheduler.schedule(initialDelay, interval, self, Tick)) context.become(active) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3e69175906..28bb10e134 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -67,13 +67,13 @@ object Flow { /** * Elements are produced from the tick `Callable` periodically with the specified interval. - * The tick element will be delivered to downstream subscribers that has requested any elements. - * If a subscriber has not requested any elements at the point in time when the tick + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def create[T](interval: FiniteDuration, tick: Callable[T]): Flow[T] = - new FlowAdapter(SFlow.apply(interval, () ⇒ tick.call())) + def create[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[T]): Flow[T] = + new FlowAdapter(SFlow.apply(initialDelay, interval, () ⇒ tick.call())) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 68044f0b47..ab1aed659c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -62,13 +62,12 @@ object Flow { /** * Elements are produced from the tick closure periodically with the specified interval. - * The tick element will be delivered to downstream subscribers that has requested any elements. - * If a subscriber has not requested any elements at the point in time when the tick + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](interval: FiniteDuration, tick: () ⇒ T): Flow[T] = FlowImpl(TickPublisherNode(interval, tick), Nil) - + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Flow[T] = FlowImpl(TickPublisherNode(initialDelay, interval, tick), Nil) } /** diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index b67b54428c..312b119e2c 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -493,11 +493,12 @@ public class FlowTest { return "tick-" + (count++); } }; - Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), tick).foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); + Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); } }, materializer); + probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); probe.expectMsgEquals("tick-1"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); probe.expectMsgEquals("tick-2"); diff --git a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala index 3c75b02661..9988fe9f33 100644 --- a/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/TickPublisherSpec.scala @@ -19,9 +19,10 @@ class TickPublisherSpec extends AkkaSpec { "produce ticks" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) + Flow(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(3) + c.expectNoMsg(600.millis) c.expectNext("tick-1") c.expectNoMsg(200.millis) c.expectNext("tick-2") @@ -34,7 +35,7 @@ class TickPublisherSpec extends AkkaSpec { "drop ticks when not requested" in { val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) + Flow(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(2) c.expectNext("tick-1") @@ -51,7 +52,7 @@ class TickPublisherSpec extends AkkaSpec { "produce ticks with multiple subscribers" in { val tickGen = Iterator from 1 - val p = Flow(1.second, () ⇒ "tick-" + tickGen.next()).toPublisher(materializer) + val p = Flow(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).toPublisher(materializer) val c1 = StreamTestKit.SubscriberProbe[String]() val c2 = StreamTestKit.SubscriberProbe[String]() p.subscribe(c1) @@ -75,7 +76,7 @@ class TickPublisherSpec extends AkkaSpec { "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - Flow(1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).produceTo(c, materializer) + Flow(1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).produceTo(c, materializer) val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") @@ -83,7 +84,7 @@ class TickPublisherSpec extends AkkaSpec { "be usable with zip for a simple form of rate limiting" in { val c = StreamTestKit.SubscriberProbe[Int]() - val rate = Flow(1.second, () ⇒ "tick").toPublisher(materializer) + val rate = Flow(1.second, 1.second, () ⇒ "tick").toPublisher(materializer) Flow(1 to 100).zip(rate).map { case (n, _) ⇒ n }.produceTo(c, materializer) val sub = c.expectSubscription() sub.request(1000)