+str #24812 fix signature of monitor()
* make monitor be a keep.both by default
This commit is contained in:
parent
f06595dd5d
commit
f9911facc0
8 changed files with 94 additions and 16 deletions
|
|
@ -20,7 +20,7 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
"A FlowMonitor" must {
|
"A FlowMonitor" must {
|
||||||
"return Finished when stream is completed" in {
|
"return Finished when stream is completed" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
source.sendComplete()
|
source.sendComplete()
|
||||||
awaitAssert(monitor.state == Finished, 3.seconds)
|
awaitAssert(monitor.state == Finished, 3.seconds)
|
||||||
sink.expectSubscriptionAndComplete()
|
sink.expectSubscriptionAndComplete()
|
||||||
|
|
@ -28,14 +28,14 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
|
|
||||||
"return Finished when stream is cancelled from downstream" in {
|
"return Finished when stream is cancelled from downstream" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
sink.cancel()
|
sink.cancel()
|
||||||
awaitAssert(monitor.state == Finished, 3.seconds)
|
awaitAssert(monitor.state == Finished, 3.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
"return Failed when stream fails, and propagate the error" in {
|
"return Failed when stream fails, and propagate the error" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
val ex = new Exception("Source failed")
|
val ex = new Exception("Source failed")
|
||||||
source.sendError(ex)
|
source.sendError(ex)
|
||||||
awaitAssert(monitor.state == Failed(ex), 3.seconds)
|
awaitAssert(monitor.state == Failed(ex), 3.seconds)
|
||||||
|
|
@ -44,7 +44,7 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
|
|
||||||
"return Initialized for an empty stream" in {
|
"return Initialized for an empty stream" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
awaitAssert(monitor.state == Initialized, 3.seconds)
|
awaitAssert(monitor.state == Initialized, 3.seconds)
|
||||||
source.expectRequest()
|
source.expectRequest()
|
||||||
sink.expectSubscription()
|
sink.expectSubscription()
|
||||||
|
|
@ -52,7 +52,7 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
|
|
||||||
"return Received after receiving a message" in {
|
"return Received after receiving a message" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
val msg = "message"
|
val msg = "message"
|
||||||
source.sendNext(msg)
|
source.sendNext(msg)
|
||||||
sink.requestNext(msg)
|
sink.requestNext(msg)
|
||||||
|
|
@ -63,7 +63,7 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
// (to avoid allocating an object for each message) doesn't introduce a bug
|
// (to avoid allocating an object for each message) doesn't introduce a bug
|
||||||
"return Received after receiving a StreamState message" in {
|
"return Received after receiving a StreamState message" in {
|
||||||
val ((source, monitor), sink) =
|
val ((source, monitor), sink) =
|
||||||
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
TestSource.probe[Any].monitorMat(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
|
||||||
val msg = Received("message")
|
val msg = Received("message")
|
||||||
source.sendNext(msg)
|
source.sendNext(msg)
|
||||||
sink.requestNext(msg)
|
sink.requestNext(msg)
|
||||||
|
|
@ -72,8 +72,8 @@ class FlowMonitorSpec extends StreamSpec {
|
||||||
|
|
||||||
"return Failed when stream is abruptly terminated" in {
|
"return Failed when stream is abruptly terminated" in {
|
||||||
val mat = ActorMaterializer()
|
val mat = ActorMaterializer()
|
||||||
val (source, monitor) =
|
val (source, monitor) = // notice that `monitor` is like a Keep.both
|
||||||
TestSource.probe[Any].monitor()(Keep.both).to(Sink.ignore).run()(mat)
|
TestSource.probe[Any].monitor.to(Sink.ignore).run()(mat)
|
||||||
mat.shutdown()
|
mat.shutdown()
|
||||||
|
|
||||||
awaitAssert(
|
awaitAssert(
|
||||||
|
|
|
||||||
|
|
@ -15,4 +15,3 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO
|
||||||
|
|
||||||
# #24699 throttle overload
|
# #24699 throttle overload
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttle")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttle")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
# +str add in-line inspect operator for side effecting #24610
|
# +str add in-line inspect operator for side effecting #24610
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
||||||
|
|
||||||
# #24758 recreate already closed substreams
|
# #24758 recreate already closed substreams
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GroupBy.this")
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.groupBy")
|
||||||
|
|
||||||
|
|
@ -13,4 +13,4 @@ ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.TcpConnectionSt
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TcpConnectionStage#Inbound.apply")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.TcpConnectionStage#Inbound.apply")
|
||||||
|
|
||||||
# #24891 add return type for FlowMonitorState.finished
|
# #24891 add return type for FlowMonitorState.finished
|
||||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.FlowMonitorState.finished")
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.FlowMonitorState.finished")
|
||||||
|
|
|
||||||
|
|
@ -2,3 +2,7 @@
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl.*")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl.*")
|
||||||
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl$")
|
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.streamref.StreamRefSettingsImpl$")
|
||||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.StreamRefSettings.*")
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.StreamRefSettings.*")
|
||||||
|
|
||||||
|
# #24812 fix signature of monitor()
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitorMat")
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor")
|
||||||
|
|
|
||||||
|
|
@ -2991,10 +2991,36 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
||||||
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
* event, and may therefor affect performance.
|
* event, and may therefor affect performance.
|
||||||
|
*
|
||||||
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17")
|
||||||
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
|
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
|
||||||
new Flow(delegate.monitor()(combinerToScala(combine)))
|
new Flow(delegate.monitorMat(combinerToScala(combine)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
*
|
||||||
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
|
*/
|
||||||
|
def monitorMat[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
|
||||||
|
new Flow(delegate.monitorMat(combinerToScala(combine)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `Pair<Mat, FlowMonitor<<Out>>`, which is unlike most other operators (!),
|
||||||
|
* in which usually the default materialized value keeping semantics is to keep the left value
|
||||||
|
* (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from
|
||||||
|
* that rule and keeps both values since dropping its sole purpose is to introduce that materialized value.
|
||||||
|
*
|
||||||
|
* The `FlowMonitor[Out]` allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
*/
|
||||||
|
def monitor(): Flow[In, Out, Pair[Mat, FlowMonitor[Out]]] =
|
||||||
|
monitorMat(Keep.both)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delays the initial element by the specified duration.
|
* Delays the initial element by the specified duration.
|
||||||
|
|
|
||||||
|
|
@ -3189,14 +3189,38 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
|
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
|
||||||
new Source(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava)))
|
new Source(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `FlowMonitor<Out>` that allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17")
|
||||||
|
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
|
||||||
|
new Source(delegate.monitorMat(combinerToScala(combine)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
||||||
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
* event, and may therefor affect performance.
|
* event, and may therefor affect performance.
|
||||||
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
*/
|
*/
|
||||||
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
|
def monitorMat[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
|
||||||
new Source(delegate.monitor()(combinerToScala(combine)))
|
new Source(delegate.monitorMat(combinerToScala(combine)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `Pair<Mat, FlowMonitor<<Out>>`, which is unlike most other operators (!),
|
||||||
|
* in which usually the default materialized value keeping semantics is to keep the left value
|
||||||
|
* (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from
|
||||||
|
* that rule and keeps both values since dropping its sole purpose is to introduce that materialized value.
|
||||||
|
*
|
||||||
|
* The `FlowMonitor` allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
*/
|
||||||
|
def monitor(): Source[Out, Pair[Mat, FlowMonitor[Out]]] =
|
||||||
|
monitorMat(Keep.both)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delays the initial element by the specified duration.
|
* Delays the initial element by the specified duration.
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,7 @@ package akka.stream.javadsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.japi.function
|
import akka.japi.{ Pair, Util, function }
|
||||||
import akka.japi.Util
|
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.util.ConstantFun
|
import akka.util.ConstantFun
|
||||||
import akka.util.JavaDurationConverters._
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
|
||||||
|
|
@ -2980,9 +2980,35 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
||||||
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
||||||
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
* event, and may therefor affect performance.
|
* event, and may therefor affect performance.
|
||||||
|
*
|
||||||
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("Use monitor() or monitorMat(combine) instead", "2.5.17")
|
||||||
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] =
|
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] =
|
||||||
viaMat(GraphStages.monitor)(combine)
|
viaMat(GraphStages.monitor)(combine)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
*
|
||||||
|
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
|
||||||
|
*/
|
||||||
|
def monitorMat[Mat2](combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] =
|
||||||
|
viaMat(GraphStages.monitor)(combine)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Materializes to `(Mat, FlowMonitor[Out])`, which is unlike most other operators (!),
|
||||||
|
* in which usually the default materialized value keeping semantics is to keep the left value
|
||||||
|
* (by passing `Keep.left()` to a `*Mat` version of a method). This operator is an exception from
|
||||||
|
* that rule and keeps both values since dropping its sole purpose is to introduce that materialized value.
|
||||||
|
*
|
||||||
|
* The `FlowMonitor[Out]` allows monitoring of the current flow. All events are propagated
|
||||||
|
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
|
||||||
|
* event, and may therefor affect performance.
|
||||||
|
*/
|
||||||
|
def monitor: ReprMat[Out, (Mat, FlowMonitor[Out])] =
|
||||||
|
monitorMat(Keep.both)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue