diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala index b07c01ef25..810791f9ab 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala @@ -58,13 +58,13 @@ class FlowFoldAsyncSpec extends StreamSpec { } "propagate an error" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFoldAsync[NotUsed](NotUsed)(noneAsync) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "complete future with failure when folding function throws" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.runFoldAsync(0) { (x, y) ⇒ if (x > 50) Future.failed(error) else Future(x + y) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index eec0e2f280..32a69e961e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -46,19 +46,19 @@ class FlowFoldSpec extends StreamSpec { } "propagate an error" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runFold[NotUsed](NotUsed)(Keep.none) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "complete future with failure when the folding function throws and the supervisor strategy decides to stop" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.runFold(0)((x, y) ⇒ if (x > 50) throw error else x + y) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y) val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))) @@ -66,7 +66,7 @@ class FlowFoldSpec extends StreamSpec { } "resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val fold = Sink.fold[Int, Int](0)((x, y) ⇒ if (y == 50) throw error else x + y) val future = inputSource.runWith(fold.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index 23e8540969..fbfbdc3b74 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -16,7 +16,7 @@ class FlowForeachSpec extends StreamSpec { implicit val materializer = ActorMaterializer() import system.dispatcher - "A Foreach" must { + "A runForeach" must { "call the procedure for each element" in assertAllStagesStopped { Source(1 to 3).runForeach(testActor ! _) foreach { @@ -48,7 +48,7 @@ class FlowForeachSpec extends StreamSpec { } "complete future with failure when function throws" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = Source.single(1).runForeach(_ ⇒ throw error) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala index 93846d2339..262a708c02 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala @@ -45,26 +45,26 @@ class FlowReduceSpec extends StreamSpec { } "propagate an error" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runReduce(Keep.none) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "complete future with failure when reducing function throws and the supervisor strategy decides to stop" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val future = inputSource.runReduce[Int]((x, y) ⇒ if (x > 50) throw error else x + y) the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "resume with the accumulated state when the folding function throws and the supervisor strategy decides to resume" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y) val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))) Await.result(future, 3.seconds) should be(expected - 50) } "resume and reset the state when the folding function throws when the supervisor strategy decides to restart" in assertAllStagesStopped { - val error = new Exception with NoStackTrace + val error = TE("Boom!") val reduce = Sink.reduce[Int]((x, y) ⇒ if (y == 50) throw error else x + y) val future = inputSource.runWith(reduce.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))) Await.result(future, 3.seconds) should be((51 to 100).sum) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala new file mode 100644 index 0000000000..8115ce4e29 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.Done + +import scala.util.control.NoStackTrace +import akka.stream.ActorMaterializer +import akka.stream.testkit._ +import akka.stream.testkit.Utils._ + +class FlowWireTapSpec extends StreamSpec { + + implicit val materializer = ActorMaterializer() + import system.dispatcher + + "A wireTap" must { + + "call the procedure for each element" in assertAllStagesStopped { + Source(1 to 3).wireTap(x ⇒ { testActor ! x }).runWith(Sink.ignore).futureValue + expectMsg(1) + expectMsg(2) + expectMsg(3) + } + + "complete the future for an empty stream" in assertAllStagesStopped { + Source.empty[String].wireTap(testActor ! _).runWith(Sink.ignore) foreach { + _ ⇒ testActor ! "done" + } + expectMsg("done") + } + + "yield the first error" in assertAllStagesStopped { + val p = TestPublisher.manualProbe[Int]() + Source.fromPublisher(p).wireTap(testActor ! _).runWith(Sink.ignore).failed foreach { + ex ⇒ testActor ! ex + } + val proc = p.expectSubscription() + proc.expectRequest() + val rte = new RuntimeException("ex") with NoStackTrace + proc.sendError(rte) + expectMsg(rte) + } + + "not cause subsequent stages to be failed if throws (same as wireTap(Sink))" in assertAllStagesStopped { + val error = TE("Boom!") + val future = Source.single(1).wireTap(_ ⇒ throw error).runWith(Sink.ignore) + future.futureValue shouldEqual Done + } + } + +} diff --git a/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes new file mode 100644 index 0000000000..c105fe6a6b --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.12.backwards.excludes @@ -0,0 +1,2 @@ +# +str add in-line inspect operator for side effecting #24610 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.wireTap") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b37b30e2c8..a6ff990206 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -483,6 +483,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f.apply)) + /** + * Similar to [[map]], however does not modify the passed through element, the returned value is ignored. + * This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream. + * + * This operation is useful for inspecting the passed through element, usually by means of side-effecting + * operations (such as `println`, or emitting metrics), for each element without having to modify it. + * + * For logging signals (elements, completion, error) consider using the [[log]] stage instead, + * along with appropriate `ActorAttributes.logLevels`. + * + * '''Emits when''' upstream emits an element; the same element will be passed to the attached function, + * as well as to the downstream stage + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def wireTap(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.wireTap(f(_))) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3f49e9787e..3199d9586b 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -843,6 +843,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * */ def wireTap(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = new Source(delegate.wireTap(that)) @@ -1071,6 +1072,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.map(f.apply)) + /** + * Similar to [[map]], however does not modify the passed through element, the returned value is ignored. + * + * This operation is useful for inspecting the passed through element, usually by means of side-effecting + * operations (such as `println`, or emitting metrics), for each element without having to modify it. + * + * For logging signals (elements, completion, error) consider using the [[log]] stage instead, + * along with appropriate `ActorAttributes.createLogLevels`. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels; Note that failures of the `f` function will not cause cancellation + */ + def wireTap(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = + new Source(delegate.wireTap(f(_))) + /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index a3622aaf00..b09ac8a3cf 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -135,6 +135,29 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.map(f.apply)) + /** + * Similar to [[map]], however does not modify the passed through element, the returned value is ignored. + * This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream. + * + * This operation is useful for inspecting the passed through element, usually by means of side-effecting + * operations (such as `println`, or emitting metrics), for each element without having to modify it. + * + * For logging signals (elements, completion, error) consider using the [[log]] stage instead, + * along with appropriate `ActorAttributes.logLevels`. + * + * '''Emits when''' upstream emits an element; the same element will be passed to the attached function, + * as well as to the downstream stage + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def wireTap(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.wireTap(f(_))) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3975dc295b..6770ceffbd 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -128,6 +128,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O def map[T](f: function.Function[Out, T]): SubSource[T, Mat] = new SubSource(delegate.map(f.apply)) + /** + * Similar to [[map]], however does not modify the passed through element, the returned value is ignored. + * This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream. + * + * This operation is useful for inspecting the passed through element, usually by means of side-effecting + * operations (such as `println`, or emitting metrics), for each element without having to modify it. + * + * For logging signals (elements, completion, error) consider using the [[log]] stage instead, + * along with appropriate `ActorAttributes.logLevels`. + * + * '''Emits when''' upstream emits an element; the same element will be passed to the attached function, + * as well as to the downstream stage + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def wireTap(f: function.Procedure[Out]): SubSource[Out, Mat] = + new SubSource(delegate.wireTap(f(_))) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c8b93cca6b..007bb942f2 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -766,6 +766,29 @@ trait FlowOps[+Out, +Mat] { */ def map[T](f: Out ⇒ T): Repr[T] = via(Map(f)) + /** + * Similar to [[map]], however does not modify the passed through element, the returned value is ignored. + * This is a simplified version of `wireTap(Sink)`, which you may use to wireTap a Sink onto this stream. + * + * This operation is useful for inspecting the passed through element, usually by means of side-effecting + * operations (such as `println`, or emitting metrics), for each element without having to modify it. + * + * For logging signals (elements, completion, error) consider using the [[log]] stage instead, + * along with appropriate `ActorAttributes.logLevels`. + * + * '''Emits when''' upstream emits an element; the same element will be passed to the attached function, + * as well as to the downstream stage + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def wireTap(f: Out ⇒ Unit): Repr[Out] = + wireTap(Sink.foreach(f)).named("wireTap") + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream.