+str add in-line wireTap operator for sideeffecting (#24610)
This commit is contained in:
parent
a3e52078df
commit
11a397d9c5
11 changed files with 181 additions and 12 deletions
|
|
@ -58,13 +58,13 @@ class FlowFoldAsyncSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"propagate an error" in assertAllStagesStopped {
|
"propagate an error" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFoldAsync[NotUsed](NotUsed)(noneAsync)
|
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFoldAsync[NotUsed](NotUsed)(noneAsync)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
"complete future with failure when folding function throws" in assertAllStagesStopped {
|
"complete future with failure when folding function throws" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.runFoldAsync(0) { (x, y) ⇒
|
val future = inputSource.runFoldAsync(0) { (x, y) ⇒
|
||||||
if (x > 50) Future.failed(error) else Future(x + y)
|
if (x > 50) Future.failed(error) else Future(x + y)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,19 +46,19 @@ class FlowFoldSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"propagate an error" in assertAllStagesStopped {
|
"propagate an error" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFold[NotUsed](NotUsed)(Keep.none)
|
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFold[NotUsed](NotUsed)(Keep.none)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
"complete future with failure when the folding function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
|
"complete future with failure when the folding function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y)
|
val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
|
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y)
|
val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y)
|
||||||
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
|
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
|
||||||
|
|
||||||
|
|
@ -66,7 +66,7 @@ class FlowFoldSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
|
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y)
|
val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y)
|
||||||
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
|
val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class FlowForeachSpec extends StreamSpec {
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
|
|
||||||
"A Foreach" must {
|
"A runForeach" must {
|
||||||
|
|
||||||
"call the procedure for each element" in assertAllStagesStopped {
|
"call the procedure for each element" in assertAllStagesStopped {
|
||||||
Source(1 to 3).runForeach(testActor ! _) foreach {
|
Source(1 to 3).runForeach(testActor ! _) foreach {
|
||||||
|
|
@ -48,7 +48,7 @@ class FlowForeachSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"complete future with failure when function throws" in assertAllStagesStopped {
|
"complete future with failure when function throws" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = Source.single(1).runForeach(_ ⇒ throw error)
|
val future = Source.single(1).runForeach(_ ⇒ throw error)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,26 +45,26 @@ class FlowReduceSpec extends StreamSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"propagate an error" in assertAllStagesStopped {
|
"propagate an error" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runReduce(Keep.none)
|
val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runReduce(Keep.none)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
"complete future with failure when reducing function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
|
"complete future with failure when reducing function throws and the supervisor strategy decides to stop" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val future = inputSource.runReduce[Int]((x, y) ⇒ if (x > 50) throw error else x + y)
|
val future = inputSource.runReduce[Int]((x, y) ⇒ if (x > 50) throw error else x + y)
|
||||||
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
|
"resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y)
|
val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y)
|
||||||
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
|
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
|
||||||
Await.result(future, 3.seconds) should be(expected - 50)
|
Await.result(future, 3.seconds) should be(expected - 50)
|
||||||
}
|
}
|
||||||
|
|
||||||
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
|
"resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped {
|
||||||
val error = new Exception with NoStackTrace
|
val error = TE("Boom!")
|
||||||
val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y)
|
val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y)
|
||||||
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
|
val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
|
||||||
Await.result(future, 3.seconds) should be((51 to 100).sum)
|
Await.result(future, 3.seconds) should be((51 to 100).sum)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.testkit._
|
||||||
|
import akka.stream.testkit.Utils._
|
||||||
|
|
||||||
|
class FlowWireTapSpec extends StreamSpec {
|
||||||
|
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
import system.dispatcher
|
||||||
|
|
||||||
|
"A wireTap" must {
|
||||||
|
|
||||||
|
"call the procedure for each element" in assertAllStagesStopped {
|
||||||
|
Source(1 to 3).wireTap(x ⇒ { testActor ! x }).runWith(Sink.ignore).futureValue
|
||||||
|
expectMsg(1)
|
||||||
|
expectMsg(2)
|
||||||
|
expectMsg(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"complete the future for an empty stream" in assertAllStagesStopped {
|
||||||
|
Source.empty[String].wireTap(testActor ! _).runWith(Sink.ignore) foreach {
|
||||||
|
_ ⇒ testActor ! "done"
|
||||||
|
}
|
||||||
|
expectMsg("done")
|
||||||
|
}
|
||||||
|
|
||||||
|
"yield the first error" in assertAllStagesStopped {
|
||||||
|
val p = TestPublisher.manualProbe[Int]()
|
||||||
|
Source.fromPublisher(p).wireTap(testActor ! _).runWith(Sink.ignore).failed foreach {
|
||||||
|
ex ⇒ testActor ! ex
|
||||||
|
}
|
||||||
|
val proc = p.expectSubscription()
|
||||||
|
proc.expectRequest()
|
||||||
|
val rte = new RuntimeException("ex") with NoStackTrace
|
||||||
|
proc.sendError(rte)
|
||||||
|
expectMsg(rte)
|
||||||
|
}
|
||||||
|
|
||||||
|
"not cause subsequent stages to be failed if throws (same as wireTap(Sink))" in assertAllStagesStopped {
|
||||||
|
val error = TE("Boom!")
|
||||||
|
val future = Source.single(1).wireTap(_ ⇒ throw error).runWith(Sink.ignore)
|
||||||
|
future.futureValue shouldEqual Done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
# +str add in-line inspect operator for side effecting #24610
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap")
|
||||||
|
|
@ -483,6 +483,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
|
def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
|
||||||
new Flow(delegate.map(f.apply))
|
new Flow(delegate.map(f.apply))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[map]], however does not modify the passed through element, the returned value is ignored.
|
||||||
|
* This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream.
|
||||||
|
*
|
||||||
|
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
|
||||||
|
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
|
||||||
|
*
|
||||||
|
* For logging signals (elements, completion, error) consider using the [[log]] stage instead,
|
||||||
|
* along with appropriate `ActorAttributes.logLevels`.
|
||||||
|
*
|
||||||
|
* '''Emits when''' upstream emits an element; the same element will be passed to the attached function,
|
||||||
|
* as well as to the downstream stage
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def wireTap(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] =
|
||||||
|
new Flow(delegate.wireTap(f(_)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform each input element into an `Iterable` of output elements that is
|
* Transform each input element into an `Iterable` of output elements that is
|
||||||
* then flattened into the output stream.
|
* then flattened into the output stream.
|
||||||
|
|
|
||||||
|
|
@ -843,6 +843,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
* '''Completes when''' upstream completes
|
* '''Completes when''' upstream completes
|
||||||
*
|
*
|
||||||
* '''Cancels when''' downstream cancels
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
|
def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
|
||||||
new Source(delegate.wireTap(that))
|
new Source(delegate.wireTap(that))
|
||||||
|
|
@ -1071,6 +1072,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
|
def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
|
||||||
new Source(delegate.map(f.apply))
|
new Source(delegate.map(f.apply))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[map]], however does not modify the passed through element, the returned value is ignored.
|
||||||
|
*
|
||||||
|
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
|
||||||
|
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
|
||||||
|
*
|
||||||
|
* For logging signals (elements, completion, error) consider using the [[log]] stage instead,
|
||||||
|
* along with appropriate `ActorAttributes.createLogLevels`.
|
||||||
|
*
|
||||||
|
* '''Emits when''' upstream emits an element
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels; Note that failures of the `f` function will not cause cancellation
|
||||||
|
*/
|
||||||
|
def wireTap(f: function.Procedure[Out]): javadsl.Source[Out, Mat] =
|
||||||
|
new Source(delegate.wireTap(f(_)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recover allows to send last element on failure and gracefully complete the stream
|
* Recover allows to send last element on failure and gracefully complete the stream
|
||||||
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||||
|
|
|
||||||
|
|
@ -135,6 +135,29 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
|
||||||
def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] =
|
def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] =
|
||||||
new SubFlow(delegate.map(f.apply))
|
new SubFlow(delegate.map(f.apply))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[map]], however does not modify the passed through element, the returned value is ignored.
|
||||||
|
* This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream.
|
||||||
|
*
|
||||||
|
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
|
||||||
|
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
|
||||||
|
*
|
||||||
|
* For logging signals (elements, completion, error) consider using the [[log]] stage instead,
|
||||||
|
* along with appropriate `ActorAttributes.logLevels`.
|
||||||
|
*
|
||||||
|
* '''Emits when''' upstream emits an element; the same element will be passed to the attached function,
|
||||||
|
* as well as to the downstream stage
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def wireTap(f: function.Procedure[Out]): SubFlow[In, Out, Mat] =
|
||||||
|
new SubFlow(delegate.wireTap(f(_)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform each input element into an `Iterable` of output elements that is
|
* Transform each input element into an `Iterable` of output elements that is
|
||||||
* then flattened into the output stream.
|
* then flattened into the output stream.
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
||||||
def map[T](f: function.Function[Out, T]): SubSource[T, Mat] =
|
def map[T](f: function.Function[Out, T]): SubSource[T, Mat] =
|
||||||
new SubSource(delegate.map(f.apply))
|
new SubSource(delegate.map(f.apply))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[map]], however does not modify the passed through element, the returned value is ignored.
|
||||||
|
* This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream.
|
||||||
|
*
|
||||||
|
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
|
||||||
|
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
|
||||||
|
*
|
||||||
|
* For logging signals (elements, completion, error) consider using the [[log]] stage instead,
|
||||||
|
* along with appropriate `ActorAttributes.logLevels`.
|
||||||
|
*
|
||||||
|
* '''Emits when''' upstream emits an element; the same element will be passed to the attached function,
|
||||||
|
* as well as to the downstream stage
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def wireTap(f: function.Procedure[Out]): SubSource[Out, Mat] =
|
||||||
|
new SubSource(delegate.wireTap(f(_)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform each input element into an `Iterable` of output elements that is
|
* Transform each input element into an `Iterable` of output elements that is
|
||||||
* then flattened into the output stream.
|
* then flattened into the output stream.
|
||||||
|
|
|
||||||
|
|
@ -766,6 +766,29 @@ trait FlowOps[+Out, +Mat] {
|
||||||
*/
|
*/
|
||||||
def map[T](f: Out ⇒ T): Repr[T] = via(Map(f))
|
def map[T](f: Out ⇒ T): Repr[T] = via(Map(f))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[map]], however does not modify the passed through element, the returned value is ignored.
|
||||||
|
* This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream.
|
||||||
|
*
|
||||||
|
* This operation is useful for inspecting the passed through element, usually by means of side-effecting
|
||||||
|
* operations (such as `println`, or emitting metrics), for each element without having to modify it.
|
||||||
|
*
|
||||||
|
* For logging signals (elements, completion, error) consider using the [[log]] stage instead,
|
||||||
|
* along with appropriate `ActorAttributes.logLevels`.
|
||||||
|
*
|
||||||
|
* '''Emits when''' upstream emits an element; the same element will be passed to the attached function,
|
||||||
|
* as well as to the downstream stage
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def wireTap(f: Out ⇒ Unit): Repr[Out] =
|
||||||
|
wireTap(Sink.foreach(f)).named("wireTap")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform each input element into an `Iterable` of output elements that is
|
* Transform each input element into an `Iterable` of output elements that is
|
||||||
* then flattened into the output stream.
|
* then flattened into the output stream.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue