diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala index 8115ce4e29..6769fffb1c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWireTapSpec.scala @@ -5,24 +5,22 @@ package akka.stream.scaladsl import akka.Done - -import scala.util.control.NoStackTrace import akka.stream.ActorMaterializer -import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit._ -class FlowWireTapSpec extends StreamSpec { +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +class FlowWireTapSpec extends StreamSpec("akka.stream.materializer.debug.fuzzing-mode = off") { implicit val materializer = ActorMaterializer() import system.dispatcher "A wireTap" must { "call the procedure for each element" in assertAllStagesStopped { - Source(1 to 3).wireTap(x ⇒ { testActor ! x }).runWith(Sink.ignore).futureValue - expectMsg(1) - expectMsg(2) - expectMsg(3) + Source(1 to 100).wireTap(testActor ! _).runWith(Sink.ignore).futureValue + 1 to 100 foreach { i ⇒ expectMsg(i) } } "complete the future for an empty stream" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8e1f57d15e..68769b5e6a 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -487,6 +487,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. * + * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. + * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6cc27fe17d..d41de9f30c 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1076,6 +1076,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. * + * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. + * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index c40ea9fbcc..dd7076c50c 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -139,6 +139,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. * + * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. + * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3672c38b75..29224f7ba3 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -132,6 +132,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * This is a simplified version of `wireTap(Sink)` that takes only a simple procedure. * Elements will be passed into this "side channel" function, and any of its results will be ignored. * + * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. + * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 861f71355d..5fabcd39d4 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -770,6 +770,8 @@ trait FlowOps[+Out, +Mat] { * This is a simplified version of `wireTap(Sink)` that takes only a simple function. * Elements will be passed into this "side channel" function, and any of its results will be ignored. * + * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. + * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. *