diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 16c208f119..99ccfdbcdb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -25,7 +25,6 @@ import org.scalactic.ConversionCheckedTripleEquals import org.scalatest.concurrent.PatienceConfiguration.Timeout import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSink - import java.util.concurrent.ThreadLocalRandom object FlowGroupBySpec { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index c9ee63d252..dfad132580 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -6,13 +6,14 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean import akka.actor._ +import akka.annotation.InternalApi import akka.event.LoggingAdapter import akka.pattern.ask import akka.stream._ import akka.stream.impl.fusing.GraphInterpreterShell import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, ExecutionContextExecutor } +import scala.concurrent.{Await, ExecutionContextExecutor} /** * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell @@ -21,21 +22,21 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { override def withNamePrefix(name: String): ExtendedActorMaterializer - /** - * INTERNAL API - */ - def materialize[Mat]( - _runnableGraph: Graph[ClosedShape, Mat], - subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat + /** INTERNAL API */ + @InternalApi def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat - /** - * INTERNAL API - */ - def materialize[Mat]( + /** INTERNAL API */ + @InternalApi def materialize[Mat]( _runnableGraph: Graph[ClosedShape, Mat], - subflowFuser: GraphInterpreterShell ⇒ ActorRef, initialAttributes: Attributes): Mat + /** INTERNAL API */ + @InternalApi private[akka] def materialize[Mat]( + graph: Graph[ClosedShape, Mat], + initialAttributes: Attributes, + defaultPhase: Phase[Any], + phases: Map[IslandTag, Phase[Any]]): Mat + /** * INTERNAL API */ @@ -78,13 +79,32 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { } +/** + * This materializer replaces the default phase with one that will fuse stages into an existing interpreter (via `registerShell`), + * rather than start a new actor for each of them. + * + * The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph. + */ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { + require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check? + + val subFusingPhase = new Phase[Any] { + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { + new GraphStageIsland(settings, materializer, islandName, Some(registerShell)).asInstanceOf[PhaseIsland[Any]] + } + } + override def executionContext: ExecutionContextExecutor = delegate.executionContext - override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) + override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = + delegate.materialize(runnable) - override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = - delegate.materialize(runnable, registerShell, initialAttributes) + override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = { + if (PhasedFusingActorMaterializer.Debug) println(s"Using [${getClass.getSimpleName}] to materialize [${runnable}]") + val phases = PhasedFusingActorMaterializer.DefaultPhases + + delegate.materialize(runnable, initialAttributes, subFusingPhase, phases) + } override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 35665c0d68..2e9d64785e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ package akka.stream.impl import java.util @@ -27,16 +30,15 @@ object PhasedFusingActorMaterializer { val Debug = false val DefaultPhase: Phase[Any] = new Phase[Any] { - override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, - islandName: String): PhaseIsland[Any] = - new GraphStageIsland(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]] + override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = + new GraphStageIsland(settings, materializer, islandName, subflowFuser = None).asInstanceOf[PhaseIsland[Any]] } val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( SinkModuleIslandTag → new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - (new SinkModulePhase(materializer, islandName)).asInstanceOf[PhaseIsland[Any]] + new SinkModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]] }, SourceModuleIslandTag → new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, @@ -369,6 +371,11 @@ case class PhasedFusingActorMaterializer( } } + override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { + case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId + case other ⇒ other + }) + override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = system.scheduler.schedule(initialDelay, interval, task)(executionContext) @@ -376,37 +383,19 @@ case class PhasedFusingActorMaterializer( system.scheduler.scheduleOnce(delay, task)(executionContext) override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = - materialize(_runnableGraph, null, defaultInitialAttributes) - - override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = - materialize(_runnableGraph, null, initialAttributes) - - override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], subflowFuser: (GraphInterpreterShell) ⇒ ActorRef): Mat = - materialize(_runnableGraph, subflowFuser, defaultInitialAttributes) - - override def makeLogger(logSource: Class[_]): LoggingAdapter = - Logging(system, logSource) - - override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { - case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId - case other ⇒ other - }) + materialize(_runnableGraph, defaultInitialAttributes) override def materialize[Mat]( _runnableGraph: Graph[ClosedShape, Mat], - subflowFuser: (GraphInterpreterShell) ⇒ ActorRef, - initialAttributes: Attributes): Mat = { + initialAttributes: Attributes): Mat = materialize( _runnableGraph, - subflowFuser, initialAttributes, PhasedFusingActorMaterializer.DefaultPhase, PhasedFusingActorMaterializer.DefaultPhases) - } - def materialize[Mat]( + override def materialize[Mat]( graph: Graph[ClosedShape, Mat], - subflowFuser: GraphInterpreterShell ⇒ ActorRef, initialAttributes: Attributes, defaultPhase: Phase[Any], phases: Map[IslandTag, Phase[Any]]): Mat = { @@ -505,6 +494,9 @@ case class PhasedFusingActorMaterializer( matValueStack.peekLast().asInstanceOf[Mat] } + override def makeLogger(logSource: Class[_]): LoggingAdapter = + Logging(system, logSource) + } trait IslandTag @@ -539,7 +531,8 @@ object GraphStageTag extends IslandTag final class GraphStageIsland( effectiveSettings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, - islandName: String) extends PhaseIsland[GraphStageLogic] { + islandName: String, + subflowFuser: Option[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] private[this] val logics = new ArrayList[GraphStageLogic](64) @@ -655,15 +648,17 @@ final class GraphStageIsland( shell.connections = finalConnections shell.logics = logics.toArray(logicArrayType) - // TODO: Subfusing - // if (subflowFuser != null) { - // subflowFuser(shell) - // } else { - val props = ActorGraphInterpreter.props(shell) - .withDispatcher(effectiveSettings.dispatcher) - materializer.actorOf(props, islandName) - // } + // TODO make OptionVal + subflowFuser match { + case Some(fuseIntoExistingInterperter) ⇒ + fuseIntoExistingInterperter(shell) + case _ ⇒ + val props = ActorGraphInterpreter.props(shell) + .withDispatcher(effectiveSettings.dispatcher) + + materializer.actorOf(props, islandName) + } } override def toString: String = "GraphStagePhase" diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 98740ee8de..4fc3e7c3a3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal */ object ActorGraphInterpreter { - object ResumeActor extends DeadLetterSuppression with NoSerializationVerificationNeeded + object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { def shell: GraphInterpreterShell @@ -443,12 +443,11 @@ final class GraphInterpreterShell( } final case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent { - override def execute(eventLimit: Int): Int = { + override def execute(eventLimit: Int): Int = if (!waitingForShutdown) { if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit } else eventLimit - } } final case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent { @@ -541,7 +540,7 @@ final class GraphInterpreterShell( private var waitingForShutdown: Boolean = false - private val resume = ResumeShell(this) + val resume = ResumeShell(this) def sendResume(sendResume: Boolean): Unit = { resumeScheduled = true @@ -606,7 +605,7 @@ final class GraphInterpreterShell( } // TODO: Fix debug string - override def toString: String = s"GraphInterpreterShell\n" // ${assembly.toString.replace("\n", "\n ")}" + override def toString: String = s"GraphInterpreterShell" // \n${assembly.toString.replace("\n", "\n ")}" } /** @@ -621,7 +620,7 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor def tryInit(shell: GraphInterpreterShell): Boolean = try { - currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit(_), currentLimit) + currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit, currentLimit) if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") if (shell.isTerminated) false else { @@ -641,13 +640,13 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor private var shortCircuitBuffer: util.ArrayDeque[Any] = null def enqueueToShortCircuit(input: Any): Unit = { - if (shortCircuitBuffer == null) shortCircuitBuffer = new util.ArrayDeque[Any]() + if (shortCircuitBuffer eq null) shortCircuitBuffer = new util.ArrayDeque[Any]() shortCircuitBuffer.addLast(input) } def registerShell(shell: GraphInterpreterShell): ActorRef = { newShells ::= shell - enqueueToShortCircuit(shell.ResumeShell(shell)) + enqueueToShortCircuit(Resume) self } @@ -678,13 +677,14 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty) shortCircuitBuffer.poll() match { case b: BoundaryEvent ⇒ processEvent(b) - case ResumeActor ⇒ finishShellRegistration() + case Resume ⇒ finishShellRegistration() } - if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! ResumeActor + if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume } private def processEvent(b: BoundaryEvent): Unit = { val shell = b.shell + if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) { try currentLimit = shell.processEvent(b, currentLimit) catch { @@ -704,7 +704,7 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor processEvent(b) if (shortCircuitBuffer != null) shortCircuitBatch() - case ResumeActor ⇒ + case Resume ⇒ currentLimit = eventLimit if (shortCircuitBuffer != null) shortCircuitBatch()