From 5382014133d01cd38729d731763841680f93a42c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 13:26:27 +0200 Subject: [PATCH] make Map a GraphStage --- .../impl/fusing/GraphInterpreterSpecKit.scala | 2 - .../stream/impl/fusing/InterpreterSpec.scala | 77 +++++++++++-------- .../impl/fusing/InterpreterStressSpec.scala | 2 +- .../fusing/InterpreterSupervisionSpec.scala | 47 ++++++----- .../impl/fusing/IteratorInterpreterSpec.scala | 10 +-- .../fusing/LifecycleInterpreterSpec.scala | 8 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 2 +- .../main/scala/akka/stream/impl/Stages.scala | 4 - .../scala/akka/stream/impl/fusing/Ops.scala | 29 ++++++- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- 10 files changed, 108 insertions(+), 75 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index a6f060be2d..cf0a8f788c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -310,8 +310,6 @@ trait GraphInterpreterSpecKit extends StreamSpec { abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { val ops = _ops.toArray - def this(op: Seq[Stage[_, _]], dummy: Int = 42) = this(op.map(_.toGS): _*) - val upstream = new UpstreamOneBoundedProbe[T] val downstream = new DownstreamOneBoundedPortProbe[T] var lastEvent = Set.empty[TestEvent] diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index f8b3526713..61d982801d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -7,8 +7,8 @@ import akka.stream.impl.ConstantFun import akka.stream.stage._ import akka.stream.testkit.StreamSpec import akka.testkit.EventFilter - import akka.stream._ +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -24,7 +24,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "Interpreter" must { - "implement map correctly" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "implement map correctly" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -43,10 +43,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnComplete)) } - "implement chain of maps correctly" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider), - Map((x: Int) ⇒ x * 2, stoppingDecider), - Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "implement chain of maps correctly" in new OneBoundedSetup[Int]( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ x * 2), + Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) @@ -66,7 +66,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(Cancel)) } - "work with only boundary ops" in new OneBoundedSetup[Int](Seq.empty) { + "work with only boundary ops" in new OneBoundedSetup[Int]() { lastEvents() should be(Set.empty) downstream.requestOne() @@ -149,7 +149,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "implement take inside a chain" in new OneBoundedSetup[Int]( Filter((x: Int) ⇒ x != 0), takeTwo, - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS) { + Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) @@ -433,11 +433,11 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { // Note, the new interpreter has no jumpback table, still did not want to remove the test "work with jumpback table and completed elements" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x, stoppingDecider).toGS, - Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x), KeepGoing(), - Map((x: Int) ⇒ x, stoppingDecider).toGS, - Map((x: Int) ⇒ x, stoppingDecider).toGS) { + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x)) { lastEvents() should be(Set.empty) @@ -464,8 +464,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } - "work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq( - new PushFinishStage)) { + "work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](new PushFinishStage) { lastEvents() should be(Set.empty) @@ -476,10 +475,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(0), OnComplete)) } - "work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq( - Map((x: Any) ⇒ x, stoppingDecider), + "work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int]( + Map((x: Any) ⇒ x), new PushFinishStage, - Map((x: Any) ⇒ x, stoppingDecider))) { + Map((x: Any) ⇒ x)) { lastEvents() should be(Set.empty) @@ -491,7 +490,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[Int]( - (new PushFinishStage).toGS, + new PushFinishStage, Fold(0, (x: Int, y: Int) ⇒ x + y)) { lastEvents() should be(Set.empty) @@ -503,11 +502,18 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(1), OnComplete)) } - "report error if pull is called while op is terminating" in new OneBoundedSetup[Int](Seq(new PushPullStage[Any, Any] { - override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull() - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull() - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination() - })) { + "report error if pull is called while op is terminating" in new OneBoundedSetup[Int]( + new SimpleLinearGraphStage[Any] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = pull(in) + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = if (!hasBeenPulled(in)) pull(in) + + setHandlers(in, out, this) + } + } + ) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -558,8 +564,8 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination() } - "not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int](Seq( - new InvalidAbsorbTermination)) { + // This test must be kept since it tests the compatibility layer, which while is deprecated it is still here. + "not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int]((new InvalidAbsorbTermination).toGS) { lastEvents() should be(Set.empty) EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept { @@ -635,16 +641,19 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { override val shape: FlowShape[T, T] = FlowShape(in, out) } - // This test is related to issue #17351 - private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] { - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = - ctx.pushAndFinish(elem) + private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = { + push(out, grab(in)) + completeStage() + } + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = failStage(akka.stream.testkit.Utils.TE("Cannot happen")) + override def postStop(): Unit = onPostStop() - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = - ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) - - override def postStop(): Unit = - onPostStop() + setHandlers(in, out, this) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index d7ef645b1a..ff7895fe71 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -14,7 +14,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { val halfLength = chainLength / 2 val repetition = 100 - val map = Map((x: Int) ⇒ x + 1, stoppingDecider).toGS + val map = Map((x: Int) ⇒ x + 1) // GraphStages can be reused val dropOne = Drop(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 5e40d02b04..355c6453b7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -6,11 +6,9 @@ package akka.stream.impl.fusing import akka.stream.testkit.StreamSpec import scala.util.control.NoStackTrace -import akka.stream.Supervision -import akka.stream.stage.Context -import akka.stream.stage.PushPullStage -import akka.stream.stage.Stage -import akka.stream.stage.SyncDirective +import akka.stream.{ ActorAttributes, Attributes, Supervision } +import akka.stream.stage._ +import akka.testkit.AkkaSpec class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -21,16 +19,22 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit override def toString = "TE" } + class ResumingMap[In, Out](_f: In ⇒ Out) extends Map(_f) { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + super.createLogic(inheritedAttributes.and(ActorAttributes.supervisionStrategy(resumingDecider))) + } + "Interpreter error handling" must { - "handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "handle external failure" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) upstream.onError(TE) lastEvents() should be(Set(OnError(TE))) } - "emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) { + "emit failure when op throws" in new OneBoundedSetup[Int](Map((x: Int) ⇒ if (x == 0) throw TE else x)) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(2) @@ -42,10 +46,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(Cancel, OnError(TE))) } - "emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider), - Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, stoppingDecider), - Map((x: Int) ⇒ x + 100, stoppingDecider))) { + "emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int]( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x == 0) throw TE else x + 10), + Map((x: Int) ⇒ x + 100)) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) @@ -58,7 +62,9 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(Cancel, OnError(TE))) } - "resume when Map throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, resumingDecider))) { + "resume when Map throws" in new OneBoundedSetup[Int]( + new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x) + ) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(2) @@ -82,10 +88,11 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(4))) } - "resume when Map throws in middle of the chain" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, resumingDecider), - Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, resumingDecider), - Map((x: Int) ⇒ x + 100, resumingDecider))) { + "resume when Map throws in middle of the chain" in new OneBoundedSetup[Int]( + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x + 10), + new ResumingMap((x: Int) ⇒ x + 100) + ) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) @@ -102,8 +109,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit } "resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x + 1, resumingDecider).toGS, - Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS, + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(3)) { downstream.requestOne() @@ -122,8 +129,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit } "complete after resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x + 1, resumingDecider).toGS, - Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS, + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(1000)) { downstream.requestOne() diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala index d991673ed0..c5e1906a2e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala @@ -16,14 +16,14 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "work in the happy case" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x + 1))).iterator itr.toSeq should be(2 to 11) } "hasNext should not affect elements" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x))).iterator itr.hasNext should be(true) itr.hasNext should be(true) @@ -42,7 +42,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "throw exceptions on empty iterator" in { val itr = new IteratorInterpreter[Int, Int](List(1).iterator, Seq( - Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x))).iterator itr.next() should be(1) a[NoSuchElementException] should be thrownBy { itr.next() } @@ -50,7 +50,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "throw exceptions when op in chain throws" in { val itr = new IteratorInterpreter[Int, Int](List(1, 2, 3).iterator, Seq( - Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n, stoppingDecider).toGS)).iterator + Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n))).iterator itr.next() should be(1) itr.hasNext should be(true) @@ -60,7 +60,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "work with an empty iterator" in { val itr = new IteratorInterpreter[Int, Int](Iterator.empty, Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x + 1))).iterator itr.hasNext should be(false) a[NoSuchElementException] should be thrownBy { itr.next() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala index 9b3959c6a2..7faf0e68c9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala @@ -84,9 +84,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "onError when preStart fails with stages after" in new OneBoundedSetup[String]( - Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x), PreStartFailer(() ⇒ throw TE("Boom!")), - Map((x: Int) ⇒ x, stoppingDecider).toGS) { + Map((x: Int) ⇒ x)) { lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) } @@ -112,9 +112,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String]( - Map((x: Any) ⇒ x, stoppingDecider).toGS, + Map((x: Any) ⇒ x), new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), - Map((x: Any) ⇒ x, stoppingDecider).toGS) { + Map((x: Any) ⇒ x)) { lastEvents() should be(Set.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 98b1032fe2..10a6da89f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -55,7 +55,7 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re } val faultyFlow: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ in.via({ - val stage = new PushPullGraphStage((_) ⇒ fusing.Map({ x: Any ⇒ x }, stoppingDecider), Attributes.none) + val stage = fusing.Map({ x: Any ⇒ x }) val assembly = new GraphAssembly( Array(stage), diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c5c8371cd6..961a3b8a5a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -156,10 +156,6 @@ object Stages { } - final case class Map[In, Out](f: In ⇒ Out, attributes: Attributes = map) extends SymbolicStage[In, Out] { - override def create(attr: Attributes): Stage[In, Out] = fusing.Map(f, supervision(attr)) - } - final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends SymbolicStage[T, T] { require(size > 0, s"Buffer size must be larger than zero but was [$size]") override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6fcbbb7c00..6336133c6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -25,10 +25,33 @@ import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API */ -final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] { - override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem)) +// FIXME: Not final because InterpreterSupervisionSpec. Some better option is needed here +case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { + val in = Inlet[In]("Map.in") + val out = Outlet[Out]("Map.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.map - override def decide(t: Throwable): Supervision.Directive = decider(t) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private def decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + override def onPush(): Unit = { + try { + push(out, f(grab(in))) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ pull(in) + } + } + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index de56a2966f..de991e41a1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -475,7 +475,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def map[T](f: Out ⇒ T): Repr[T] = andThen(Map(f)) + def map[T](f: Out ⇒ T): Repr[T] = via(Map(f)) /** * Transform each input element into an `Iterable` of output elements that is