diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 2fb1662f1b..c019b19cd2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -40,48 +40,6 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re val identity: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ in.map(e ⇒ e) val identity2: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ identity(in) - // TODO: Reenable these tests - // class BrokenActorInterpreter(_shell: GraphInterpreterShell, brokenMessage: Any) - // extends ActorGraphInterpreter(_shell) { - // - // override protected[akka] def aroundReceive(receive: Receive, msg: Any) = { - // msg match { - // case ActorGraphInterpreter.OnNext(_, 0, m) if m == brokenMessage ⇒ - // throw new NullPointerException(s"I'm so broken [$m]") - // case _ ⇒ super.aroundReceive(receive, msg) - // } - // } - // } - - // val faultyFlow: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ in.via({ - // val stage = fusing.Map({ x: Any ⇒ x }) - // - // val assembly = new GraphAssembly( - // Array(stage), - // Array(Attributes.none), - // Array(stage.shape.in, null), - // Array(0, -1), - // Array(null, stage.shape.out), - // Array(-1, 0)) - // - // val (connections, logics) = - // assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - // - // val shell = new GraphInterpreterShell(assembly, connections, logics, stage.shape, settings, - // materializer.asInstanceOf[ActorMaterializerImpl]) - // - // val props = Props(new BrokenActorInterpreter(shell, "a3")) - // .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local) - // val impl = system.actorOf(props, "borken-stage-actor") - // - // val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, 0) - // val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, 0) } - // - // impl ! ActorGraphInterpreter.ExposedPublisher(shell, 0, publisher) - // - // Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) - // }) - val toPublisher: (Source[Any, _], ActorMaterializer) ⇒ Publisher[Any] = (f, m) ⇒ f.runWith(Sink.asPublisher(false))(m) @@ -535,67 +493,6 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10) } } - // - // "A broken Flow" must { - // "cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in { - // new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(16)) { - // - // def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = { - // val error = sprobe.expectError() - // error.isInstanceOf[AbruptTerminationException] should be(true) - // error.getMessage should startWith("Processor actor") - // } - // - // val downstream2 = TestSubscriber.manualProbe[Any]() - // publisher.subscribe(downstream2) - // val downstream2Subscription = downstream2.expectSubscription() - // - // downstreamSubscription.request(5) - // downstream2Subscription.request(5) - // upstream.expectRequest(upstreamSubscription, 1) - // upstreamSubscription.sendNext("a1") - // downstream.expectNext("a1") - // downstream2.expectNext("a1") - // - // upstream.expectRequest(upstreamSubscription, 1) - // upstreamSubscription.sendNext("a2") - // downstream.expectNext("a2") - // downstream2.expectNext("a2") - // - // val filters = immutable.Seq( - // EventFilter[NullPointerException](), - // EventFilter[IllegalStateException](), - // EventFilter[PostRestartException]()) // This is thrown because we attach the dummy failing actor to toplevel - // try { - // system.eventStream.publish(Mute(filters)) - // - // upstream.expectRequest(upstreamSubscription, 1) - // upstreamSubscription.sendNext("a3") - // upstreamSubscription.expectCancellation() - // - // // IllegalStateException terminated abruptly - // checkError(downstream) - // checkError(downstream2) - // - // val downstream3 = TestSubscriber.manualProbe[Any]() - // publisher.subscribe(downstream3) - // downstream3.expectSubscription() - // // IllegalStateException terminated abruptly - // checkError(downstream3) - // } finally { - // system.eventStream.publish(UnMute(filters)) - // } - // } - // } - // - // "suitably override attribute handling methods" in { - // import Attributes._ - // val f: Flow[Int, Int, NotUsed] = Flow[Int].map(_ + 1).async.addAttributes(none).named("name") - // - // f.module.attributes.getFirst[Name] shouldEqual Some(Name("name")) - // f.module.attributes.getFirst[Attributes.AsyncBoundary.type] shouldEqual Some(AsyncBoundary) - // } - // } object TestException extends RuntimeException with NoStackTrace diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 753940e971..0dc573d665 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -14,7 +14,7 @@ import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, Batc import akka.stream.impl.fusing.GraphInterpreter.Connection import akka.stream.impl.fusing._ import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } -import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.collection.immutable.Map import scala.concurrent.duration.FiniteDuration @@ -39,6 +39,10 @@ object PhasedFusingActorMaterializer { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = new SourceModulePhase(materializer).asInstanceOf[PhaseIsland[Any]] }, + ProcessorModuleIslandTag → new Phase[Any] { + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] = + new ProcessorModulePhase(materializer).asInstanceOf[PhaseIsland[Any]] + }, GraphStageTag → DefaultPhase ) @@ -699,3 +703,24 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends override def onIslandReady(): Unit = () } + +object ProcessorModuleIslandTag extends IslandTag + +final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Processor[Any, Any]] { + override def name: String = "ProcessorModulePhase" + private[this] var processor: Processor[Any, Any] = _ + + override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (Processor[Any, Any], Any) = { + val procAndMat = mod.asInstanceOf[ProcessorModule[Any, Any, Any]].createProcessor() + processor = procAndMat._1 + procAndMat + } + + override def assignPort(in: InPort, slot: Int, logic: Processor[Any, Any]): Unit = () + override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = () + + override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic + override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor) + + override def onIslandReady(): Unit = () +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index c3009e0559..bfed635ab2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -107,7 +107,7 @@ private[akka] final class FanoutPublisherSink[In]( val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) val impl = actorMaterializer.actorOf( context, - FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes))) + FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(context.effectiveAttributes))) val fanoutProcessor = new ActorProcessor[In, In](impl) impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) // Resolve cyclic dependency with actor. This MUST be the first message no matter what. diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index cacffef7f9..2c9848f901 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -356,5 +356,6 @@ final case class ProcessorModule[In, Out, Mat]( override def withAttributes(attributes: Attributes) = copy(attributes = attributes) override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]" - override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this) + override private[stream] def traversalBuilder = + LinearTraversalBuilder.fromModule(this).makeIsland(ProcessorModuleIslandTag) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 69ff2d421b..98740ee8de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -612,7 +612,7 @@ final class GraphInterpreterShell( /** * INTERNAL API */ -class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { +final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { import ActorGraphInterpreter._ var activeInterpreters = Set.empty[GraphInterpreterShell]