diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 86bc87bd41..0b0da58add 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -48,10 +48,10 @@ class StreamBuffersRateSpec extends AkkaSpec { val zipper = ZipWith[Tick, Int, Int]((tick, count) => count) - Source(initialDelay = 1.second, interval = 1.second, () => "message!") + Source(initialDelay = 1.second, interval = 1.second, "message!") .conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.right - Source(initialDelay = 3.second, interval = 3.second, () => Tick()) ~> zipper.left + Source(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.left zipper.out ~> Sink.foreach(println) } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 952753c9cb..c8677119ef 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -407,15 +407,8 @@ public class SourceTest extends StreamTest { @Test public void mustProduceTicks() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - final Callable tick = new Callable() { - private int count = 1; - - @Override - public String call() { - return "tick-" + (count++); - } - }; - KeyedSource tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick); + KeyedSource tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), + FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick"); MaterializedMap map = tickSource.to(Sink.foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); @@ -423,9 +416,9 @@ public class SourceTest extends StreamTest { })).run(materializer); Cancellable cancellable = map.get(tickSource); // validates we can obtain the cancellable probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); - probe.expectMsgEquals("tick-1"); + probe.expectMsgEquals("tick"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); - probe.expectMsgEquals("tick-2"); + probe.expectMsgEquals("tick"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala index ee0114e08d..f38a39817b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -29,8 +29,8 @@ class GraphJunctionAttributesSpec extends AkkaSpec { val source = Source[(SlowTick, List[FastTick])]() { implicit b ⇒ import FlowGraphImplicits._ - val slow = Source(0.seconds, 100.millis, () ⇒ SlowTick) - val fast = Source(0.seconds, 10.millis, () ⇒ FastTick) + val slow = Source(0.seconds, 100.millis, SlowTick) + val fast = Source(0.seconds, 10.millis, FastTick) val sink = UndefinedSink[(SlowTick, List[FastTick])] val zip = Zip[SlowTick, List[FastTick]](inputBuffer(1, 1)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 7012bbc0d6..3573031604 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -5,10 +5,10 @@ package akka.stream.scaladsl import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.stream.FlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.MaterializerSettings class TickSourceSpec extends AkkaSpec { @@ -16,42 +16,39 @@ class TickSourceSpec extends AkkaSpec { "A Flow based on tick publisher" must { "produce ticks" in { - val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).to(Sink(c)).run() + Source(1.second, 500.millis, "tick").to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectNoMsg(600.millis) - c.expectNext("tick-1") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-2") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-3") + c.expectNext("tick") sub.cancel() c.expectNoMsg(200.millis) } "drop ticks when not requested" in { - val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).to(Sink(c)).run() + Source(1.second, 1.second, "tick").to(Sink(c)).run() val sub = c.expectSubscription() sub.request(2) - c.expectNext("tick-1") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-2") + c.expectNext("tick") c.expectNoMsg(1400.millis) sub.request(2) - c.expectNext("tick-4") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-5") + c.expectNext("tick") sub.cancel() c.expectNoMsg(200.millis) } "reject multiple subscribers, but keep the first" in { - val tickGen = Iterator from 1 - val p = Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).runWith(Sink.publisher) + val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[String]() val c2 = StreamTestKit.SubscriberProbe[String]() p.subscribe(c1) @@ -59,24 +56,13 @@ class TickSourceSpec extends AkkaSpec { val sub1 = c1.expectSubscription() c2.expectError() sub1.request(1) - c1.expectNext("tick-1") + c1.expectNext("tick") c1.expectNoMsg(200.millis) sub1.request(2) - c1.expectNext("tick-2") + c1.expectNext("tick") sub1.cancel() } - "signal onError when tick closure throws" in { - val c = StreamTestKit.SubscriberProbe[String]() - val tickSource = Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace) - val m = tickSource.to(Sink(c)).run() - val cancellable = m.get(tickSource) - val sub = c.expectSubscription() - sub.request(3) - c.expectError.getMessage should be("tick err") - awaitCond(cancellable.isCancelled) - } - "be usable with zip for a simple form of rate limiting" in { val c = StreamTestKit.SubscriberProbe[Int]() @@ -84,7 +70,7 @@ class TickSourceSpec extends AkkaSpec { import FlowGraphImplicits._ val zip = Zip[Int, String] Source(1 to 100) ~> zip.left - Source(1.second, 1.second, () ⇒ "tick") ~> zip.right + Source(1.second, 1.second, "tick") ~> zip.right zip.out ~> Flow[(Int, String)].map { case (n, _) ⇒ n } ~> Sink(c) }.run() @@ -98,19 +84,18 @@ class TickSourceSpec extends AkkaSpec { } "be possible to cancel" in { - val tickGen = Iterator from 1 val c = StreamTestKit.SubscriberProbe[String]() - val tickSource = Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()) + val tickSource = Source(1.second, 500.millis, "tick") val m = tickSource.to(Sink(c)).run() val cancellable = m.get(tickSource) val sub = c.expectSubscription() sub.request(3) c.expectNoMsg(600.millis) - c.expectNext("tick-1") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-2") + c.expectNext("tick") c.expectNoMsg(200.millis) - c.expectNext("tick-3") + c.expectNext("tick") cancellable.cancel() awaitCond(cancellable.isCancelled) sub.request(3) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 48ebdfeba5..b6a9e6db2c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -17,7 +17,7 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] object TickPublisher { - def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, + def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, settings: MaterializerSettings, cancelled: AtomicBoolean): Props = Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher) @@ -39,11 +39,11 @@ private[akka] object TickPublisher { /** * INTERNAL API * - * Elements are produced from the tick closure periodically with the specified interval. Supports only one subscriber. + * Elements are emitted with the specified interval. Supports only one subscriber. * The subscriber will receive the tick element if it has requested any elements, * otherwise the tick element is dropped. */ -private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, +private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ @@ -86,10 +86,9 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite def active: Receive = { case Tick ⇒ try { - val tickElement = tick() // FIXME should we call this even if we shouldn't send it? if (demand > 0) { demand -= 1 - tryOnNext(subscriber, tickElement) + tryOnNext(subscriber, tick) } } catch { case NonFatal(e) ⇒ handleError(e) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3bba5ef823..2171909475 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -95,14 +95,14 @@ object Source { new Source(scaladsl.Source(future)) /** - * Elements are produced from the tick closure periodically with the specified interval. + * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] = - new KeyedSource(scaladsl.Source(initialDelay, interval, () ⇒ tick.call())) + def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.KeyedSource[O, Cancellable] = + new KeyedSource(scaladsl.Source(initialDelay, interval, tick)) /** * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 399aa8179a..df091d1072 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -151,13 +151,13 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS } /** - * Elements are produced from the tick closure periodically with the specified interval. + * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors? +final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out) extends KeyedActorFlowSource[Out, Cancellable] { // FIXME Why does this have anything to do with Actors? override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { val (pub, cancellable) = create(materializer, flowName) pub.subscribe(flowSubscriber) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index e63e797959..39ea49e74a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -130,13 +130,13 @@ object Source { def apply[T](future: Future[T]): Source[T] = FutureSource(future) /** - * Elements are produced from the tick closure periodically with the specified interval. + * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): TickSource[T] = + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): TickSource[T] = TickSource(initialDelay, interval, tick) /**