+str #22425 re-implement sub-fusing with new materializer (#22478)

This commit is contained in:
Konrad `ktoso` Malawski 2017-03-08 13:10:36 +01:00 committed by drewhk
parent 15c57e9c93
commit b8c7d116d5
4 changed files with 75 additions and 61 deletions

View file

@ -25,7 +25,6 @@ import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
object FlowGroupBySpec { object FlowGroupBySpec {

View file

@ -6,13 +6,14 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.pattern.ask import akka.pattern.ask
import akka.stream._ import akka.stream._
import akka.stream.impl.fusing.GraphInterpreterShell import akka.stream.impl.fusing.GraphInterpreterShell
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor } import scala.concurrent.{Await, ExecutionContextExecutor}
/** /**
* ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell
@ -21,21 +22,21 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
override def withNamePrefix(name: String): ExtendedActorMaterializer override def withNamePrefix(name: String): ExtendedActorMaterializer
/** /** INTERNAL API */
* INTERNAL API @InternalApi def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat
*/
def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat
/** /** INTERNAL API */
* INTERNAL API @InternalApi def materialize[Mat](
*/
def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat], _runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef,
initialAttributes: Attributes): Mat initialAttributes: Attributes): Mat
/** INTERNAL API */
@InternalApi private[akka] def materialize[Mat](
graph: Graph[ClosedShape, Mat],
initialAttributes: Attributes,
defaultPhase: Phase[Any],
phases: Map[IslandTag, Phase[Any]]): Mat
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -78,13 +79,32 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
} }
/**
* This materializer replaces the default phase with one that will fuse stages into an existing interpreter (via `registerShell`),
* rather than start a new actor for each of them.
*
* The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph.
*/
private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ActorRef) extends Materializer { private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ActorRef) extends Materializer {
require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check?
val subFusingPhase = new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = {
new GraphStageIsland(settings, materializer, islandName, Some(registerShell)).asInstanceOf[PhaseIsland[Any]]
}
}
override def executionContext: ExecutionContextExecutor = delegate.executionContext override def executionContext: ExecutionContextExecutor = delegate.executionContext
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
delegate.materialize(runnable)
override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = {
delegate.materialize(runnable, registerShell, initialAttributes) if (PhasedFusingActorMaterializer.Debug) println(s"Using [${getClass.getSimpleName}] to materialize [${runnable}]")
val phases = PhasedFusingActorMaterializer.DefaultPhases
delegate.materialize(runnable, initialAttributes, subFusingPhase, phases)
}
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl package akka.stream.impl
import java.util import java.util
@ -27,16 +30,15 @@ object PhasedFusingActorMaterializer {
val Debug = false val Debug = false
val DefaultPhase: Phase[Any] = new Phase[Any] { val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] =
islandName: String): PhaseIsland[Any] = new GraphStageIsland(settings, materializer, islandName, subflowFuser = None).asInstanceOf[PhaseIsland[Any]]
new GraphStageIsland(settings, materializer, islandName).asInstanceOf[PhaseIsland[Any]]
} }
val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]](
SinkModuleIslandTag new Phase[Any] { SinkModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[Any] = islandName: String): PhaseIsland[Any] =
(new SinkModulePhase(materializer, islandName)).asInstanceOf[PhaseIsland[Any]] new SinkModulePhase(materializer, islandName).asInstanceOf[PhaseIsland[Any]]
}, },
SourceModuleIslandTag new Phase[Any] { SourceModuleIslandTag new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer,
@ -369,6 +371,11 @@ case class PhasedFusingActorMaterializer(
} }
} }
override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {
case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId
case other other
})
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
system.scheduler.schedule(initialDelay, interval, task)(executionContext) system.scheduler.schedule(initialDelay, interval, task)(executionContext)
@ -376,37 +383,19 @@ case class PhasedFusingActorMaterializer(
system.scheduler.scheduleOnce(delay, task)(executionContext) system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, null, defaultInitialAttributes) materialize(_runnableGraph, defaultInitialAttributes)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
materialize(_runnableGraph, null, initialAttributes)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], subflowFuser: (GraphInterpreterShell) ActorRef): Mat =
materialize(_runnableGraph, subflowFuser, defaultInitialAttributes)
override def makeLogger(logSource: Class[_]): LoggingAdapter =
Logging(system, logSource)
override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {
case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId
case other other
})
override def materialize[Mat]( override def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat], _runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: (GraphInterpreterShell) ActorRef, initialAttributes: Attributes): Mat =
initialAttributes: Attributes): Mat = {
materialize( materialize(
_runnableGraph, _runnableGraph,
subflowFuser,
initialAttributes, initialAttributes,
PhasedFusingActorMaterializer.DefaultPhase, PhasedFusingActorMaterializer.DefaultPhase,
PhasedFusingActorMaterializer.DefaultPhases) PhasedFusingActorMaterializer.DefaultPhases)
}
def materialize[Mat]( override def materialize[Mat](
graph: Graph[ClosedShape, Mat], graph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef,
initialAttributes: Attributes, initialAttributes: Attributes,
defaultPhase: Phase[Any], defaultPhase: Phase[Any],
phases: Map[IslandTag, Phase[Any]]): Mat = { phases: Map[IslandTag, Phase[Any]]): Mat = {
@ -505,6 +494,9 @@ case class PhasedFusingActorMaterializer(
matValueStack.peekLast().asInstanceOf[Mat] matValueStack.peekLast().asInstanceOf[Mat]
} }
override def makeLogger(logSource: Class[_]): LoggingAdapter =
Logging(system, logSource)
} }
trait IslandTag trait IslandTag
@ -539,7 +531,8 @@ object GraphStageTag extends IslandTag
final class GraphStageIsland( final class GraphStageIsland(
effectiveSettings: ActorMaterializerSettings, effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer, materializer: PhasedFusingActorMaterializer,
islandName: String) extends PhaseIsland[GraphStageLogic] { islandName: String,
subflowFuser: Option[GraphInterpreterShell ActorRef]) extends PhaseIsland[GraphStageLogic] {
// TODO: remove these // TODO: remove these
private val logicArrayType = Array.empty[GraphStageLogic] private val logicArrayType = Array.empty[GraphStageLogic]
private[this] val logics = new ArrayList[GraphStageLogic](64) private[this] val logics = new ArrayList[GraphStageLogic](64)
@ -655,15 +648,17 @@ final class GraphStageIsland(
shell.connections = finalConnections shell.connections = finalConnections
shell.logics = logics.toArray(logicArrayType) shell.logics = logics.toArray(logicArrayType)
// TODO: Subfusing // TODO make OptionVal
// if (subflowFuser != null) { subflowFuser match {
// subflowFuser(shell) case Some(fuseIntoExistingInterperter)
// } else { fuseIntoExistingInterperter(shell)
val props = ActorGraphInterpreter.props(shell)
.withDispatcher(effectiveSettings.dispatcher)
materializer.actorOf(props, islandName)
// }
case _
val props = ActorGraphInterpreter.props(shell)
.withDispatcher(effectiveSettings.dispatcher)
materializer.actorOf(props, islandName)
}
} }
override def toString: String = "GraphStagePhase" override def toString: String = "GraphStagePhase"

View file

@ -25,7 +25,7 @@ import scala.util.control.NonFatal
*/ */
object ActorGraphInterpreter { object ActorGraphInterpreter {
object ResumeActor extends DeadLetterSuppression with NoSerializationVerificationNeeded object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded
trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded {
def shell: GraphInterpreterShell def shell: GraphInterpreterShell
@ -443,12 +443,11 @@ final class GraphInterpreterShell(
} }
final case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent { final case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent {
override def execute(eventLimit: Int): Int = { override def execute(eventLimit: Int): Int =
if (!waitingForShutdown) { if (!waitingForShutdown) {
if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume")
if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit
} else eventLimit } else eventLimit
}
} }
final case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent { final case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent {
@ -541,7 +540,7 @@ final class GraphInterpreterShell(
private var waitingForShutdown: Boolean = false private var waitingForShutdown: Boolean = false
private val resume = ResumeShell(this) val resume = ResumeShell(this)
def sendResume(sendResume: Boolean): Unit = { def sendResume(sendResume: Boolean): Unit = {
resumeScheduled = true resumeScheduled = true
@ -606,7 +605,7 @@ final class GraphInterpreterShell(
} }
// TODO: Fix debug string // TODO: Fix debug string
override def toString: String = s"GraphInterpreterShell\n" // ${assembly.toString.replace("\n", "\n ")}" override def toString: String = s"GraphInterpreterShell" // \n${assembly.toString.replace("\n", "\n ")}"
} }
/** /**
@ -621,7 +620,7 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor
def tryInit(shell: GraphInterpreterShell): Boolean = def tryInit(shell: GraphInterpreterShell): Boolean =
try { try {
currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit(_), currentLimit) currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit, currentLimit)
if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}")
if (shell.isTerminated) false if (shell.isTerminated) false
else { else {
@ -641,13 +640,13 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor
private var shortCircuitBuffer: util.ArrayDeque[Any] = null private var shortCircuitBuffer: util.ArrayDeque[Any] = null
def enqueueToShortCircuit(input: Any): Unit = { def enqueueToShortCircuit(input: Any): Unit = {
if (shortCircuitBuffer == null) shortCircuitBuffer = new util.ArrayDeque[Any]() if (shortCircuitBuffer eq null) shortCircuitBuffer = new util.ArrayDeque[Any]()
shortCircuitBuffer.addLast(input) shortCircuitBuffer.addLast(input)
} }
def registerShell(shell: GraphInterpreterShell): ActorRef = { def registerShell(shell: GraphInterpreterShell): ActorRef = {
newShells ::= shell newShells ::= shell
enqueueToShortCircuit(shell.ResumeShell(shell)) enqueueToShortCircuit(Resume)
self self
} }
@ -678,13 +677,14 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor
while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty) while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty)
shortCircuitBuffer.poll() match { shortCircuitBuffer.poll() match {
case b: BoundaryEvent processEvent(b) case b: BoundaryEvent processEvent(b)
case ResumeActor finishShellRegistration() case Resume finishShellRegistration()
} }
if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! ResumeActor if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume
} }
private def processEvent(b: BoundaryEvent): Unit = { private def processEvent(b: BoundaryEvent): Unit = {
val shell = b.shell val shell = b.shell
if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) { if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) {
try currentLimit = shell.processEvent(b, currentLimit) try currentLimit = shell.processEvent(b, currentLimit)
catch { catch {
@ -704,7 +704,7 @@ final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor
processEvent(b) processEvent(b)
if (shortCircuitBuffer != null) shortCircuitBatch() if (shortCircuitBuffer != null) shortCircuitBatch()
case ResumeActor case Resume
currentLimit = eventLimit currentLimit = eventLimit
if (shortCircuitBuffer != null) shortCircuitBatch() if (shortCircuitBuffer != null) shortCircuitBatch()