Make flowspec great again drewhk (#22443)

* #22435: Make DirectProcessorModule work again

* #22435: Fix attributes propagation for FanoutProcessor

* #22435: Remove old "faulty-flow" spec
This commit is contained in:
drewhk 2017-03-03 10:49:34 +01:00 committed by GitHub
parent 46b869d041
commit b33339f13e
5 changed files with 30 additions and 107 deletions

View file

@ -40,48 +40,6 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
val identity: Flow[Any, Any, NotUsed] Flow[Any, Any, NotUsed] = in in.map(e e)
val identity2: Flow[Any, Any, NotUsed] Flow[Any, Any, NotUsed] = in identity(in)
// TODO: Reenable these tests
// class BrokenActorInterpreter(_shell: GraphInterpreterShell, brokenMessage: Any)
// extends ActorGraphInterpreter(_shell) {
//
// override protected[akka] def aroundReceive(receive: Receive, msg: Any) = {
// msg match {
// case ActorGraphInterpreter.OnNext(_, 0, m) if m == brokenMessage
// throw new NullPointerException(s"I'm so broken [$m]")
// case _ super.aroundReceive(receive, msg)
// }
// }
// }
// val faultyFlow: Flow[Any, Any, NotUsed] Flow[Any, Any, NotUsed] = in in.via({
// val stage = fusing.Map({ x: Any x })
//
// val assembly = new GraphAssembly(
// Array(stage),
// Array(Attributes.none),
// Array(stage.shape.in, null),
// Array(0, -1),
// Array(null, stage.shape.out),
// Array(-1, 0))
//
// val (connections, logics) =
// assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
//
// val shell = new GraphInterpreterShell(assembly, connections, logics, stage.shape, settings,
// materializer.asInstanceOf[ActorMaterializerImpl])
//
// val props = Props(new BrokenActorInterpreter(shell, "a3"))
// .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local)
// val impl = system.actorOf(props, "borken-stage-actor")
//
// val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, 0)
// val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, 0) }
//
// impl ! ActorGraphInterpreter.ExposedPublisher(shell, 0, publisher)
//
// Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher))
// })
val toPublisher: (Source[Any, _], ActorMaterializer) Publisher[Any] =
(f, m) f.runWith(Sink.asPublisher(false))(m)
@ -535,67 +493,6 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}
}
//
// "A broken Flow" must {
// "cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in {
// new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(16)) {
//
// def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = {
// val error = sprobe.expectError()
// error.isInstanceOf[AbruptTerminationException] should be(true)
// error.getMessage should startWith("Processor actor")
// }
//
// val downstream2 = TestSubscriber.manualProbe[Any]()
// publisher.subscribe(downstream2)
// val downstream2Subscription = downstream2.expectSubscription()
//
// downstreamSubscription.request(5)
// downstream2Subscription.request(5)
// upstream.expectRequest(upstreamSubscription, 1)
// upstreamSubscription.sendNext("a1")
// downstream.expectNext("a1")
// downstream2.expectNext("a1")
//
// upstream.expectRequest(upstreamSubscription, 1)
// upstreamSubscription.sendNext("a2")
// downstream.expectNext("a2")
// downstream2.expectNext("a2")
//
// val filters = immutable.Seq(
// EventFilter[NullPointerException](),
// EventFilter[IllegalStateException](),
// EventFilter[PostRestartException]()) // This is thrown because we attach the dummy failing actor to toplevel
// try {
// system.eventStream.publish(Mute(filters))
//
// upstream.expectRequest(upstreamSubscription, 1)
// upstreamSubscription.sendNext("a3")
// upstreamSubscription.expectCancellation()
//
// // IllegalStateException terminated abruptly
// checkError(downstream)
// checkError(downstream2)
//
// val downstream3 = TestSubscriber.manualProbe[Any]()
// publisher.subscribe(downstream3)
// downstream3.expectSubscription()
// // IllegalStateException terminated abruptly
// checkError(downstream3)
// } finally {
// system.eventStream.publish(UnMute(filters))
// }
// }
// }
//
// "suitably override attribute handling methods" in {
// import Attributes._
// val f: Flow[Int, Int, NotUsed] = Flow[Int].map(_ + 1).async.addAttributes(none).named("name")
//
// f.module.attributes.getFirst[Name] shouldEqual Some(Name("name"))
// f.module.attributes.getFirst[Attributes.AsyncBoundary.type] shouldEqual Some(AsyncBoundary)
// }
// }
object TestException extends RuntimeException with NoStackTrace

View file

@ -14,7 +14,7 @@ import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, Batc
import akka.stream.impl.fusing.GraphInterpreter.Connection
import akka.stream.impl.fusing._
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration
@ -39,6 +39,10 @@ object PhasedFusingActorMaterializer {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
new SourceModulePhase(materializer).asInstanceOf[PhaseIsland[Any]]
},
ProcessorModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer): PhaseIsland[Any] =
new ProcessorModulePhase(materializer).asInstanceOf[PhaseIsland[Any]]
},
GraphStageTag DefaultPhase
)
@ -699,3 +703,24 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer) extends
override def onIslandReady(): Unit = ()
}
object ProcessorModuleIslandTag extends IslandTag
final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer) extends PhaseIsland[Processor[Any, Any]] {
override def name: String = "ProcessorModulePhase"
private[this] var processor: Processor[Any, Any] = _
override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (Processor[Any, Any], Any) = {
val procAndMat = mod.asInstanceOf[ProcessorModule[Any, Any, Any]].createProcessor()
processor = procAndMat._1
procAndMat
}
override def assignPort(in: InPort, slot: Int, logic: Processor[Any, Any]): Unit = ()
override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = ()
override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor)
override def onIslandReady(): Unit = ()
}

View file

@ -107,7 +107,7 @@ private[akka] final class FanoutPublisherSink[In](
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
val impl = actorMaterializer.actorOf(
context,
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes)))
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(context.effectiveAttributes)))
val fanoutProcessor = new ActorProcessor[In, In](impl)
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.

View file

@ -356,5 +356,6 @@ final case class ProcessorModule[In, Out, Mat](
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]"
override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this)
override private[stream] def traversalBuilder =
LinearTraversalBuilder.fromModule(this).makeIsland(ProcessorModuleIslandTag)
}

View file

@ -612,7 +612,7 @@ final class GraphInterpreterShell(
/**
* INTERNAL API
*/
class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging {
final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging {
import ActorGraphInterpreter._
var activeInterpreters = Set.empty[GraphInterpreterShell]