From a2203d7b13305793b3257e4a5d77c076d0d1fb54 Mon Sep 17 00:00:00 2001 From: drewhk Date: Thu, 2 Mar 2017 13:44:19 +0100 Subject: [PATCH] #22424: Use InputBuffer attribute for asynchronous inputs (#22441) --- .../impl/PhasedFusingActorMaterializer.scala | 26 +++++++++++++++---- .../scala/akka/stream/stage/GraphStage.scala | 5 ++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 03478a917f..753940e971 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -7,6 +7,7 @@ import akka.NotUsed import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } +import akka.stream.Attributes.InputBuffer import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.{ ActorOutputBoundary, BatchingActorInputBoundary } @@ -145,12 +146,25 @@ class IslandTracking( length } - def enterIsland(tag: IslandTag): ExitIsland = { + def enterIsland(tag: IslandTag, attributes: Attributes): ExitIsland = { completeSegment() val previousPhase = currentPhase val previousIslandOffset = currentIslandGlobalOffset - currentPhase = phases(tag)(settings, materializer) + val effectiveSettings: ActorMaterializerSettings = { + import Attributes._ + import ActorAttributes._ + attributes.attributeList.foldLeft(settings) { (s, attr) ⇒ + attr match { + case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider) + case _ ⇒ s + } + } + } + + currentPhase = phases(tag)(effectiveSettings, materializer) if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") // Resolve the phase to be used to materialize this island @@ -454,7 +468,7 @@ case class PhasedFusingActorMaterializer( attributesStack.removeLast() if (Debug) println(s"ATTR POP") case EnterIsland(tag, island) ⇒ - traversalStack.addLast(islandTracking.enterIsland(tag)) + traversalStack.addLast(islandTracking.enterIsland(tag, attributesStack.getLast)) nextStep = island case ex: ExitIsland ⇒ islandTracking.exitIsland(ex) @@ -523,6 +537,7 @@ final class GraphStageIsland( val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]] val matAndLogic = stageModule.stage.createLogicAndMaterializedValue(attributes) val logic = matAndLogic._1 + logic.attributes = attributes logics.add(logic) logic.stageId = logics.size() - 1 matAndLogic @@ -584,10 +599,11 @@ final class GraphStageIsland( } override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { - // TODO: proper input buffer sizes from attributes val connection = conn(slot) // TODO: proper input port debug string (currently prints the stage) - val boundary = new BatchingActorInputBoundary(16, shell, publisher, connection.inOwner.toString) + val bufferSize = connection.inOwner.attributes.get[InputBuffer].get.max + val boundary = + new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString) logics.add(boundary) boundary.stageId = logics.size() - 1 diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 37e987c09a..366ddec721 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -216,6 +216,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ private[stream] var stageId: Int = Int.MinValue + /** + * INTERNAL API + */ + private[stream] var attributes: Attributes = Attributes.none + /** * INTERNAL API */