#21743: FlattenMerge should propagate outer attributes

Added Materializer.materialize() version that takes explicit initial attributes
This commit is contained in:
drewhk 2016-11-09 20:14:04 +01:00 committed by Johan Andrén
parent 4dd969c0ae
commit 91b522e186
5 changed files with 68 additions and 7 deletions

View file

@ -4,8 +4,10 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.stream.{ ActorMaterializerSettings, ActorMaterializer } import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream._
import akka.stream.testkit.Utils.assertAllStagesStopped import akka.stream.testkit.Utils.assertAllStagesStopped
import scala.concurrent._ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.{ StreamSpec, TestPublisher }
@ -165,5 +167,31 @@ class FlowFlattenMergeSpec extends StreamSpec {
elems should ===((0 until 1000).toSet) elems should ===((0 until 1000).toSet)
} }
val attributesSource = Source.fromGraph(
new GraphStage[SourceShape[Attributes]] {
val out = Outlet[Attributes]("AttributesSource.out")
override val shape: SourceShape[Attributes] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = {
push(out, inheritedAttributes)
completeStage()
}
setHandler(out, this)
}
}
)
"propagate attributes to inner streams" in assertAllStagesStopped {
val f = Source.single(attributesSource.addAttributes(Attributes.name("inner")))
.flatMapMerge(1, identity)
.addAttributes(Attributes.name("outer"))
.runWith(Sink.head)
val attributes = Await.result(f, 3.seconds).attributeList
attributes should contain(Attributes.Name("inner"))
attributes should contain(Attributes.Name("outer"))
attributes.indexOf(Attributes.Name("outer")) < attributes.indexOf(Attributes.Name("inner")) should be(true)
}
} }
} }

View file

@ -26,6 +26,13 @@ abstract class Materializer {
*/ */
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
/**
* This method interprets the given Flow description and creates the running
* stream using an explicitly provided [[Attributes]] as top level attributes. The result can be highly
* implementation specific, ranging from local actor chains to remote-deployed processing networks.
*/
def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat
/** /**
* Running a flow graph will require execution resources, as will computations * Running a flow graph will require execution resources, as will computations
* within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]]
@ -62,6 +69,9 @@ private[akka] object NoMaterializer extends Materializer {
throw new UnsupportedOperationException("NoMaterializer cannot be named") throw new UnsupportedOperationException("NoMaterializer cannot be named")
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
throw new UnsupportedOperationException("NoMaterializer cannot materialize") throw new UnsupportedOperationException("NoMaterializer cannot materialize")
override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
throw new UnsupportedOperationException("NoMaterializer cannot materialize")
override def executionContext: ExecutionContextExecutor = override def executionContext: ExecutionContextExecutor =
throw new UnsupportedOperationException("NoMaterializer does not provide an ExecutionContext") throw new UnsupportedOperationException("NoMaterializer does not provide an ExecutionContext")

View file

@ -39,6 +39,14 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
_runnableGraph: Graph[ClosedShape, Mat], _runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat subflowFuser: GraphInterpreterShell ActorRef): Mat
/**
* INTERNAL API
*/
def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef,
initialAttributes: Attributes): Mat
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -109,7 +117,7 @@ private[akka] case class ActorMaterializerImpl(
private[this] def createFlowName(): String = flowNames.next() private[this] def createFlowName(): String = flowNames.next()
private val initialAttributes = Attributes( private val defaultInitialAttributes = Attributes(
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
ActorAttributes.Dispatcher(settings.dispatcher) :: ActorAttributes.Dispatcher(settings.dispatcher) ::
ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
@ -135,11 +143,19 @@ private[akka] case class ActorMaterializerImpl(
system.scheduler.scheduleOnce(delay, task)(executionContext) system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, null) materialize(_runnableGraph, null, defaultInitialAttributes)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
materialize(_runnableGraph, null, initialAttributes)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], subflowFuser: (GraphInterpreterShell) ActorRef): Mat =
materialize(_runnableGraph, subflowFuser, defaultInitialAttributes)
override def materialize[Mat]( override def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat], _runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat = { subflowFuser: GraphInterpreterShell ActorRef,
initialAttributes: Attributes
): Mat = {
val runnableGraph = val runnableGraph =
if (settings.autoFusing) Fusing.aggressive(_runnableGraph) if (settings.autoFusing) Fusing.aggressive(_runnableGraph)
else _runnableGraph else _runnableGraph
@ -255,6 +271,9 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell)
override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
delegate.materialize(runnable, registerShell, initialAttributes)
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task)
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =

View file

@ -31,7 +31,7 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr
override def initialAttributes = DefaultAttributes.flattenMerge override def initialAttributes = DefaultAttributes.flattenMerge
override val shape = FlowShape(in, out) override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { override def createLogic(enclosingAttributes: Attributes) = new GraphStageLogic(shape) {
var sources = Set.empty[SubSinkInlet[T]] var sources = Set.empty[SubSinkInlet[T]]
def activeSources = sources.size def activeSources = sources.size
@ -82,7 +82,8 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr
}) })
sinkIn.pull() sinkIn.pull()
sources += sinkIn sources += sinkIn
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) val graph = Source.fromGraph(source).to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph, initialAttributes = enclosingAttributes)
} }
def removeSource(src: SubSinkInlet[T]): Unit = { def removeSource(src: SubSinkInlet[T]): Unit = {

View file

@ -1016,6 +1016,9 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.VirtualPathContainer.this"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.VirtualPathContainer.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteSystemDaemon.this") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteSystemDaemon.this")
),
"2.4.12" -> Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.materialize")
) )
) )
} }