#22424: Use InputBuffer attribute for asynchronous inputs (#22441)

This commit is contained in:
drewhk 2017-03-02 13:44:19 +01:00 committed by GitHub
parent ba63c7af8d
commit a2203d7b13
2 changed files with 26 additions and 5 deletions

View file

@ -7,6 +7,7 @@ import akka.NotUsed
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props }
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.stream.Attributes.InputBuffer
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary }
@ -145,12 +146,25 @@ class IslandTracking(
length length
} }
def enterIsland(tag: IslandTag): ExitIsland = { def enterIsland(tag: IslandTag, attributes: Attributes): ExitIsland = {
completeSegment() completeSegment()
val previousPhase = currentPhase val previousPhase = currentPhase
val previousIslandOffset = currentIslandGlobalOffset val previousIslandOffset = currentIslandGlobalOffset
currentPhase = phases(tag)(settings, materializer) val effectiveSettings: ActorMaterializerSettings = {
import Attributes._
import ActorAttributes._
attributes.attributeList.foldLeft(settings) { (s, attr)
attr match {
case InputBuffer(initial, max) s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) s.withSupervisionStrategy(decider)
case _ s
}
}
}
currentPhase = phases(tag)(effectiveSettings, materializer)
if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
// Resolve the phase to be used to materialize this island // Resolve the phase to be used to materialize this island
@ -454,7 +468,7 @@ case class PhasedFusingActorMaterializer(
attributesStack.removeLast() attributesStack.removeLast()
if (Debug) println(s"ATTR POP") if (Debug) println(s"ATTR POP")
case EnterIsland(tag, island) case EnterIsland(tag, island)
traversalStack.addLast(islandTracking.enterIsland(tag)) traversalStack.addLast(islandTracking.enterIsland(tag, attributesStack.getLast))
nextStep = island nextStep = island
case ex: ExitIsland case ex: ExitIsland
islandTracking.exitIsland(ex) islandTracking.exitIsland(ex)
@ -523,6 +537,7 @@ final class GraphStageIsland(
val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]] val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]]
val matAndLogic = stageModule.stage.createLogicAndMaterializedValue(attributes) val matAndLogic = stageModule.stage.createLogicAndMaterializedValue(attributes)
val logic = matAndLogic._1 val logic = matAndLogic._1
logic.attributes = attributes
logics.add(logic) logics.add(logic)
logic.stageId = logics.size() - 1 logic.stageId = logics.size() - 1
matAndLogic matAndLogic
@ -584,10 +599,11 @@ final class GraphStageIsland(
} }
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
// TODO: proper input buffer sizes from attributes
val connection = conn(slot) val connection = conn(slot)
// TODO: proper input port debug string (currently prints the stage) // TODO: proper input port debug string (currently prints the stage)
val boundary = new BatchingActorInputBoundary(16, shell, publisher, connection.inOwner.toString) val bufferSize = connection.inOwner.attributes.get[InputBuffer].get.max
val boundary =
new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString)
logics.add(boundary) logics.add(boundary)
boundary.stageId = logics.size() - 1 boundary.stageId = logics.size() - 1

View file

@ -216,6 +216,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/ */
private[stream] var stageId: Int = Int.MinValue private[stream] var stageId: Int = Int.MinValue
/**
* INTERNAL API
*/
private[stream] var attributes: Attributes = Attributes.none
/** /**
* INTERNAL API * INTERNAL API
*/ */