From ee5ec725528b9356c9d39642e8035261f594d597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Wed, 4 Nov 2015 11:43:11 +0200 Subject: [PATCH] =str #15707 name unnamed modules * give name attribute to TickSource and nested modules * reflow module toString output * give toString to flow --- .../rst/java/code/docs/MigrationsJava.java | 9 +++++++++ .../rst/java/migration-guide-1.0-2.x-java.rst | 17 +++++++++++------ .../rst/scala/code/docs/MigrationsScala.scala | 3 +-- .../scala/code/docs/stream/FlowDocSpec.scala | 4 ++-- .../docs/stream/StreamBuffersRateSpec.scala | 4 ++-- .../code/docs/stream/StreamTestKitDocSpec.scala | 2 +- .../rst/scala/migration-guide-1.0-2.x-scala.rst | 11 +++++------ .../akka/http/impl/engine/ws/Websocket.scala | 2 +- .../java/akka/stream/javadsl/SourceTest.java | 2 +- .../akka/stream/scaladsl/TickSourceSpec.scala | 10 +++++----- .../scala/akka/stream/impl/StreamLayout.scala | 10 ++++------ .../main/scala/akka/stream/javadsl/Source.scala | 4 ++-- .../main/scala/akka/stream/scaladsl/Flow.scala | 2 ++ .../scala/akka/stream/scaladsl/Source.scala | 4 ++-- 14 files changed, 48 insertions(+), 36 deletions(-) diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index a2c3b0eefe..af191b552a 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -1,13 +1,17 @@ package docs; +import akka.actor.Cancellable; import akka.japi.Pair; import akka.japi.function.Function; import akka.stream.*; import akka.stream.javadsl.*; import scala.Option; +import scala.concurrent.duration.FiniteDuration; import scala.concurrent.Promise; import scala.runtime.BoxedUnit; +import java.util.concurrent.TimeUnit; + public class MigrationsJava { // This is compile-only code, no need for actually running anything. @@ -110,6 +114,11 @@ public class MigrationsJava { Source>> src = Source.maybe(); // Complete the promise with an empty option to emulate the old lazyEmpty promise.trySuccess(scala.Option.empty()); + + final Source sourceUnderTest = Source.tick( + FiniteDuration.create(0, TimeUnit.MILLISECONDS), + FiniteDuration.create(200, TimeUnit.MILLISECONDS), + "tick"); //#source-creators //#empty-flow diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index e081d33f65..2a5ae42302 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -213,17 +213,16 @@ Source constructor name changes zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be sent, but no elements. -The ``apply()`` and ``from()`` overloads on ``Source`` that provide a tick source (``Source(delay,interval,tick)``) -are replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more +The ``from()`` overload on ``Source`` that provide a tick source (``Source.from(delay,interval,tick)``) +is replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more discoverable. Update procedure ---------------- -1. Replace all uses of ``Source(delay,interval,tick)`` and ``Source.from(delay,interval,tick)`` with the method - ``Source.tick()`` -2. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with +1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with a ``None`` (an empty ``Option``) +2. Replace all uses of ``Source.from(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)`` Example ^^^^^^^ @@ -235,6 +234,12 @@ Example //... promise.trySuccess(BoxedUnit.UNIT); + // This no longer works! + final Source sourceUnderTest = Source.from( + FiniteDuration.create(0, TimeUnit.MILLISECONDS), + FiniteDuration.create(200, TimeUnit.MILLISECONDS), + "tick"); + should be replaced by .. includecode:: code/docs/MigrationsJava.java#source-creators @@ -357,4 +362,4 @@ We show the necessary steps in terms of an example ``AsyncStage`` Example ^^^^^^^ -TODO \ No newline at end of file +TODO diff --git a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index 12f770abe6..51caa68b15 100644 --- a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -106,8 +106,7 @@ class MigrationsScala extends AkkaSpec { // This finishes the stream without emitting anything, just like Source.lazyEmpty did promise.trySuccess(Some(())) - // FIXME: After https://github.com/akka/akka/pull/18792 merged - val ticks = Source(1.second, 3.seconds, "tick") + val ticks = Source.tick(1.second, 3.seconds, "tick") //#source-creators //#flatMapConcat diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 3da3a92cf4..55792edd87 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -77,7 +77,7 @@ class FlowDocSpec extends AkkaSpec { import scala.concurrent.duration._ case object Tick - val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick) + val timer = Source.tick(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick) val timerCancel: Cancellable = Sink.ignore.runWith(timer) timerCancel.cancel() @@ -148,7 +148,7 @@ class FlowDocSpec extends AkkaSpec { "various ways of transforming materialized values" in { import scala.concurrent.duration._ - val throttler = Flow.fromGraph(FlowGraph.create(Source(1.second, 1.second, "test")) { implicit builder => + val throttler = Flow.fromGraph(FlowGraph.create(Source.tick(1.second, 1.second, "test")) { implicit builder => tickSource => import FlowGraph.Implicits._ val zip = builder.add(ZipWith[String, Int, Int](Keep.right)) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index b9979f880f..66eb995893 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -44,9 +44,9 @@ class StreamBuffersRateSpec extends AkkaSpec { val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count)) - Source(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0 + Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0 - Source(initialDelay = 1.second, interval = 1.second, "message!") + Source.tick(initialDelay = 1.second, interval = 1.second, "message!") .conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1 zipper.out ~> Sink.foreach(println) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala index 1bbaea1a4a..25a2be694e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala @@ -66,7 +66,7 @@ class StreamTestKitDocSpec extends AkkaSpec { "sink actor ref" in { //#sink-actorref case object Tick - val sourceUnderTest = Source(0.seconds, 200.millis, Tick) + val sourceUnderTest = Source.tick(0.seconds, 200.millis, Tick) val probe = TestProbe() val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run() diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 7b516ed45d..f19d68dad1 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -222,17 +222,16 @@ Source constructor name changes zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be sent, but no elements. -The ``apply()`` and ``from()`` overloads on ``Source`` that provide a tick source (``Source(delay,interval,tick)``) -are replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more +The ``apply()`` overload on ``Source`` that provide a tick source (``Source(delay,interval,tick)``) +is replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more discoverable. Update procedure ---------------- -1. Replace all uses of ``Source(delay,interval,tick)`` and ``Source.from(delay,interval,tick)`` with the method - ``Source.tick()`` -2. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with +1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with a ``None`` (an empty ``Option``) +2. Replace all uses of ``Source(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)`` Example ^^^^^^^ @@ -409,4 +408,4 @@ Example should be replaced by -.. includecode:: code/docs/MigrationsScala.scala#port-async \ No newline at end of file +.. includecode:: code/docs/MigrationsScala.scala#port-async diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index 0ac57b7a5d..afdf8006df 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -125,7 +125,7 @@ private[http] object Websocket { import FlowGraph.Implicits._ val split = b.add(BypassRouter) - val tick = Source(closeTimeout, closeTimeout, Tick) + val tick = Source.tick(closeTimeout, closeTimeout, Tick) val merge = b.add(BypassMerge) val messagePreparation = b.add(prepareMessages) val messageRendering = b.add(renderMessages.via(LiftCompletions)) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index d41715bc9c..ad200ba1f1 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -420,7 +420,7 @@ public class SourceTest extends StreamTest { @Test public void mustProduceTicks() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - Source tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), + Source tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick"); Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure() { public void apply(String elem) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index de6442369d..08f5449d7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -18,7 +18,7 @@ class TickSourceSpec extends AkkaSpec { "A Flow based on tick publisher" must { "produce ticks" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() - Source(1.second, 500.millis, "tick").to(Sink(c)).run() + Source.tick(1.second, 500.millis, "tick").to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectNoMsg(600.millis) @@ -33,7 +33,7 @@ class TickSourceSpec extends AkkaSpec { "drop ticks when not requested" in { val c = TestSubscriber.manualProbe[String]() - Source(1.second, 1.second, "tick").to(Sink(c)).run() + Source.tick(1.second, 1.second, "tick").to(Sink(c)).run() val sub = c.expectSubscription() sub.request(2) c.expectNext("tick") @@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec { } "reject multiple subscribers, but keep the first" in { - val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher) + val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher) val c1 = TestSubscriber.manualProbe[String]() val c2 = TestSubscriber.manualProbe[String]() p.subscribe(c1) @@ -71,7 +71,7 @@ class TickSourceSpec extends AkkaSpec { import FlowGraph.Implicits._ val zip = b.add(Zip[Int, String]()) Source(1 to 100) ~> zip.in0 - Source(1.second, 1.second, "tick") ~> zip.in1 + Source.tick(1.second, 1.second, "tick") ~> zip.in1 zip.out ~> Flow[(Int, String)].map { case (n, _) ⇒ n } ~> Sink(c) ClosedShape }).run() @@ -87,7 +87,7 @@ class TickSourceSpec extends AkkaSpec { "be possible to cancel" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() - val tickSource = Source(1.second, 500.millis, "tick") + val tickSource = Source.tick(1.second, 500.millis, "tick") val cancellable = tickSource.to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 674f4a309e..ad15a86802 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -335,12 +335,10 @@ private[akka] object StreamLayout { override def toString = s""" | Module: ${this.attributes.nameOrDefault("unnamed")} - | Modules: ${subModules.iterator.map(m ⇒ " " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("\n")} - | Downstreams: - | ${downstreams.iterator.map { case (in, out) ⇒ s" $in -> $out" }.mkString("\n")} - | Upstreams: - | ${upstreams.iterator.map { case (out, in) ⇒ s" $out -> $in" }.mkString("\n")} - """.stripMargin + | Modules: ${subModules.iterator.map(m ⇒ "\n " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("")} + | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} + | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} + |""".stripMargin } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4a54467dd2..95a7829146 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -123,8 +123,8 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] = - new Source(scaladsl.Source(initialDelay, interval, tick)) + def tick[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] = + new Source(scaladsl.Source.tick(initialDelay, interval, tick)) /** * Create a `Source` with one element. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 652744f6c7..4bc9f4336b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -237,6 +237,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) + + override def toString = s"""Flow(${module})""" } object Flow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7a287147f8..0de8b3b0e1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -204,8 +204,8 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = - fromGraph(new TickSource[T](initialDelay, interval, tick)) + def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = + fromGraph(new TickSource[T](initialDelay, interval, tick).withAttributes(DefaultAttributes.tickSource)) /** * Create a `Source` with one element.