From f9911facc07716f3167f4e2678727fc0973326fb Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 9 Apr 2018 11:48:06 +0900 Subject: [PATCH] +str #24812 fix signature of monitor() * make monitor be a keep.both by default --- .../stream/scaladsl/FlowMonitorSpec.scala | 16 +++++------ .../mima-filters/2.5.11.backwards.excludes | 1 - .../mima-filters/2.5.12.backwards.excludes | 4 +-- .../mima-filters/2.5.16.backwards.excludes | 4 +++ .../main/scala/akka/stream/javadsl/Flow.scala | 28 ++++++++++++++++++- .../scala/akka/stream/javadsl/Source.scala | 28 +++++++++++++++++-- .../scala/akka/stream/javadsl/SubSource.scala | 3 +- .../scala/akka/stream/scaladsl/Flow.scala | 26 +++++++++++++++++ 8 files changed, 94 insertions(+), 16 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala index 2e530bf5be..643b0629de 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala @@ -20,7 +20,7 @@ class FlowMonitorSpec extends StreamSpec { "A FlowMonitor" must { "return Finished when stream is completed" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() source.sendComplete() awaitAssert(monitor.state == Finished, 3.seconds) sink.expectSubscriptionAndComplete() @@ -28,14 +28,14 @@ class FlowMonitorSpec extends StreamSpec { "return Finished when stream is cancelled from downstream" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() sink.cancel() awaitAssert(monitor.state == Finished, 3.seconds) } "return Failed when stream fails, and propagate the error" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() val ex = new Exception("Source failed") source.sendError(ex) awaitAssert(monitor.state == Failed(ex), 3.seconds) @@ -44,7 +44,7 @@ class FlowMonitorSpec extends StreamSpec { "return Initialized for an empty stream" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() awaitAssert(monitor.state == Initialized, 3.seconds) source.expectRequest() sink.expectSubscription() @@ -52,7 +52,7 @@ class FlowMonitorSpec extends StreamSpec { "return Received after receiving a message" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() val msg = "message" source.sendNext(msg) sink.requestNext(msg) @@ -63,7 +63,7 @@ class FlowMonitorSpec extends StreamSpec { // (to avoid allocating an object for each message) doesn't introduce a bug "return Received after receiving a StreamState message" in { val ((source, monitor), sink) = - TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() val msg = Received("message") source.sendNext(msg) sink.requestNext(msg) @@ -72,8 +72,8 @@ class FlowMonitorSpec extends StreamSpec { "return Failed when stream is abruptly terminated" in { val mat = ActorMaterializer() - val (source, monitor) = - TestSource.probe[Any].monitor()(Keep.both).to(Sink.ignore).run()(mat) + val (source, monitor) = // notice that `monitor` is like a Keep.both + TestSource.probe[Any].monitor.to(Sink.ignore).run()(mat) mat.shutdown() awaitAssert( diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes index ac34b69d3f..12c8b2fe07 100644 --- a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -15,4 +15,3 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO # #24699 throttle overload ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttle") - diff --git a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes index d6cb6b0dcc..49e085d41f 100644 --- a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,7 +1,7 @@ # +str add in-line inspect operator for side effecting #24610 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap") -# #24758 recreate already closed substreams +# #24758 recreate already closed substreams ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy") @@ -13,4 +13,4 @@ ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.TcpConnectionSt ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TcpConnectionStage#Inbound.apply") # #24891 add return type for FlowMonitorState.finished -ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.FlowMonitorState.finished") \ No newline at end of file +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.FlowMonitorState.finished") diff --git a/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes index da2e35d88e..0a94d182b7 100644 --- a/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.16.backwards.excludes @@ -2,3 +2,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl.*") ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl$") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.StreamRefSettings.*") + +# #24812 fix signature of monitor() +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitorMat") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index a272bbd218..c87996b332 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2991,10 +2991,36 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an * event, and may therefor affect performance. + * * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. */ + @Deprecated + @deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17") def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] = - new Flow(delegate.monitor()(combinerToScala(combine))) + new Flow(delegate.monitorMat(combinerToScala(combine))) + + /** + * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + def monitorMat[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] = + new Flow(delegate.monitorMat(combinerToScala(combine))) + + /** + * Materializes to `Pair>`, which is unlike most other operators (!), + * in which usually the default materialized value keeping semantics is to keep the left value + * (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from + * that rule and keeps both values since dropping its sole purpose is to introduce that materialized value. + * + * The `FlowMonitor[Out]` allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + */ + def monitor(): Flow[In, Out, Pair[Mat, FlowMonitor[Out]]] = + monitorMat(Keep.both) /** * Delays the initial element by the specified duration. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3d84d9cde8..09d74df339 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3189,14 +3189,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = new Source(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava))) + /** + * Materializes to `FlowMonitor` that allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + @Deprecated + @deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17") + def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = + new Source(delegate.monitorMat(combinerToScala(combine))) + /** * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an * event, and may therefor affect performance. * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. */ - def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = - new Source(delegate.monitor()(combinerToScala(combine))) + def monitorMat[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = + new Source(delegate.monitorMat(combinerToScala(combine))) + + /** + * Materializes to `Pair>`, which is unlike most other operators (!), + * in which usually the default materialized value keeping semantics is to keep the left value + * (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from + * that rule and keeps both values since dropping its sole purpose is to introduce that materialized value. + * + * The `FlowMonitor` allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + */ + def monitor(): Source[Out, Pair[Mat, FlowMonitor[Out]]] = + monitorMat(Keep.both) /** * Delays the initial element by the specified duration. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index c467e11c06..1ed87d45a9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -6,8 +6,7 @@ package akka.stream.javadsl import akka.NotUsed import akka.event.LoggingAdapter -import akka.japi.function -import akka.japi.Util +import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.util.ConstantFun import akka.util.JavaDurationConverters._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b8147fec64..e791ee660c 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2980,9 +2980,35 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an * event, and may therefor affect performance. + * * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. */ + @Deprecated + @deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17") def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] = viaMat(GraphStages.monitor)(combine) + /** + * Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + def monitorMat[Mat2](combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] = + viaMat(GraphStages.monitor)(combine) + + /** + * Materializes to `(Mat, FlowMonitor[Out])`, which is unlike most other operators (!), + * in which usually the default materialized value keeping semantics is to keep the left value + * (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from + * that rule and keeps both values since dropping its sole purpose is to introduce that materialized value. + * + * The `FlowMonitor[Out]` allows monitoring of the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + */ + def monitor: ReprMat[Out, (Mat, FlowMonitor[Out])] = + monitorMat(Keep.both) + }