=str #15707 name unnamed modules

* give name attribute to TickSource and nested modules
* reflow module toString output
* give toString to flow
This commit is contained in:
Martynas Mickevičius 2015-11-04 11:43:11 +02:00
parent 3b9b95b0eb
commit ee5ec72552
14 changed files with 48 additions and 36 deletions

View file

@ -1,13 +1,17 @@
package docs;
import akka.actor.Cancellable;
import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.*;
import akka.stream.javadsl.*;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class MigrationsJava {
// This is compile-only code, no need for actually running anything.
@ -110,6 +114,11 @@ public class MigrationsJava {
Source<Integer, Promise<Option<Integer>>> src = Source.<Integer>maybe();
// Complete the promise with an empty option to emulate the old lazyEmpty
promise.trySuccess(scala.Option.empty());
final Source<String, Cancellable> sourceUnderTest = Source.tick(
FiniteDuration.create(0, TimeUnit.MILLISECONDS),
FiniteDuration.create(200, TimeUnit.MILLISECONDS),
"tick");
//#source-creators
//#empty-flow

View file

@ -213,17 +213,16 @@ Source constructor name changes
zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be
sent, but no elements.
The ``apply()`` and ``from()`` overloads on ``Source`` that provide a tick source (``Source(delay,interval,tick)``)
are replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more
The ``from()`` overload on ``Source`` that provide a tick source (``Source.from(delay,interval,tick)``)
is replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more
discoverable.
Update procedure
----------------
1. Replace all uses of ``Source(delay,interval,tick)`` and ``Source.from(delay,interval,tick)`` with the method
``Source.tick()``
2. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
a ``None`` (an empty ``Option``)
2. Replace all uses of ``Source.from(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)``
Example
^^^^^^^
@ -235,6 +234,12 @@ Example
//...
promise.trySuccess(BoxedUnit.UNIT);
// This no longer works!
final Source<String, Cancellable> sourceUnderTest = Source.from(
FiniteDuration.create(0, TimeUnit.MILLISECONDS),
FiniteDuration.create(200, TimeUnit.MILLISECONDS),
"tick");
should be replaced by
.. includecode:: code/docs/MigrationsJava.java#source-creators

View file

@ -106,8 +106,7 @@ class MigrationsScala extends AkkaSpec {
// This finishes the stream without emitting anything, just like Source.lazyEmpty did
promise.trySuccess(Some(()))
// FIXME: After https://github.com/akka/akka/pull/18792 merged
val ticks = Source(1.second, 3.seconds, "tick")
val ticks = Source.tick(1.second, 3.seconds, "tick")
//#source-creators
//#flatMapConcat

View file

@ -77,7 +77,7 @@ class FlowDocSpec extends AkkaSpec {
import scala.concurrent.duration._
case object Tick
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timer = Source.tick(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel()
@ -148,7 +148,7 @@ class FlowDocSpec extends AkkaSpec {
"various ways of transforming materialized values" in {
import scala.concurrent.duration._
val throttler = Flow.fromGraph(FlowGraph.create(Source(1.second, 1.second, "test")) { implicit builder =>
val throttler = Flow.fromGraph(FlowGraph.create(Source.tick(1.second, 1.second, "test")) { implicit builder =>
tickSource =>
import FlowGraph.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))

View file

@ -44,9 +44,9 @@ class StreamBuffersRateSpec extends AkkaSpec {
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count))
Source(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source(initialDelay = 1.second, interval = 1.second, "message!")
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
zipper.out ~> Sink.foreach(println)

View file

@ -66,7 +66,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
"sink actor ref" in {
//#sink-actorref
case object Tick
val sourceUnderTest = Source(0.seconds, 200.millis, Tick)
val sourceUnderTest = Source.tick(0.seconds, 200.millis, Tick)
val probe = TestProbe()
val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run()

View file

@ -222,17 +222,16 @@ Source constructor name changes
zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be
sent, but no elements.
The ``apply()`` and ``from()`` overloads on ``Source`` that provide a tick source (``Source(delay,interval,tick)``)
are replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more
The ``apply()`` overload on ``Source`` that provide a tick source (``Source(delay,interval,tick)``)
is replaced by the named method ``Source.tick()`` to reduce the number of overloads and to make the function more
discoverable.
Update procedure
----------------
1. Replace all uses of ``Source(delay,interval,tick)`` and ``Source.from(delay,interval,tick)`` with the method
``Source.tick()``
2. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
a ``None`` (an empty ``Option``)
2. Replace all uses of ``Source(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)``
Example
^^^^^^^

View file

@ -125,7 +125,7 @@ private[http] object Websocket {
import FlowGraph.Implicits._
val split = b.add(BypassRouter)
val tick = Source(closeTimeout, closeTimeout, Tick)
val tick = Source.tick(closeTimeout, closeTimeout, Tick)
val merge = b.add(BypassMerge)
val messagePreparation = b.add(prepareMessages)
val messageRendering = b.add(renderMessages.via(LiftCompletions))

View file

@ -420,7 +420,7 @@ public class SourceTest extends StreamTest {
@Test
public void mustProduceTicks() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
Source<String, Cancellable> tickSource = Source.from(FiniteDuration.create(1, TimeUnit.SECONDS),
Source<String, Cancellable> tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS),
FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick");
Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure<String>() {
public void apply(String elem) {

View file

@ -18,7 +18,7 @@ class TickSourceSpec extends AkkaSpec {
"A Flow based on tick publisher" must {
"produce ticks" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
Source(1.second, 500.millis, "tick").to(Sink(c)).run()
Source.tick(1.second, 500.millis, "tick").to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
@ -33,7 +33,7 @@ class TickSourceSpec extends AkkaSpec {
"drop ticks when not requested" in {
val c = TestSubscriber.manualProbe[String]()
Source(1.second, 1.second, "tick").to(Sink(c)).run()
Source.tick(1.second, 1.second, "tick").to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(2)
c.expectNext("tick")
@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec {
}
"reject multiple subscribers, but keep the first" in {
val p = Source(1.second, 1.second, "tick").runWith(Sink.publisher)
val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.publisher)
val c1 = TestSubscriber.manualProbe[String]()
val c2 = TestSubscriber.manualProbe[String]()
p.subscribe(c1)
@ -71,7 +71,7 @@ class TickSourceSpec extends AkkaSpec {
import FlowGraph.Implicits._
val zip = b.add(Zip[Int, String]())
Source(1 to 100) ~> zip.in0
Source(1.second, 1.second, "tick") ~> zip.in1
Source.tick(1.second, 1.second, "tick") ~> zip.in1
zip.out ~> Flow[(Int, String)].map { case (n, _) n } ~> Sink(c)
ClosedShape
}).run()
@ -87,7 +87,7 @@ class TickSourceSpec extends AkkaSpec {
"be possible to cancel" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
val tickSource = Source(1.second, 500.millis, "tick")
val tickSource = Source.tick(1.second, 500.millis, "tick")
val cancellable = tickSource.to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(3)

View file

@ -335,12 +335,10 @@ private[akka] object StreamLayout {
override def toString =
s"""
| Module: ${this.attributes.nameOrDefault("unnamed")}
| Modules: ${subModules.iterator.map(m " " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("\n")}
| Downstreams:
| ${downstreams.iterator.map { case (in, out) s" $in -> $out" }.mkString("\n")}
| Upstreams:
| ${upstreams.iterator.map { case (out, in) s" $out -> $in" }.mkString("\n")}
""".stripMargin
| Modules: ${subModules.iterator.map(m "\n " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
}
}

View file

@ -123,8 +123,8 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] =
new Source(scaladsl.Source(initialDelay, interval, tick))
def tick[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] =
new Source(scaladsl.Source.tick(initialDelay, interval, tick))
/**
* Create a `Source` with one element.

View file

@ -237,6 +237,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
override def toString = s"""Flow(${module})"""
}
object Flow {

View file

@ -204,8 +204,8 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
fromGraph(new TickSource[T](initialDelay, interval, tick))
def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
fromGraph(new TickSource[T](initialDelay, interval, tick).withAttributes(DefaultAttributes.tickSource))
/**
* Create a `Source` with one element.