diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index 52b016075a..6becebf0f1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -4,8 +4,10 @@ package akka.stream.scaladsl import akka.NotUsed -import akka.stream.{ ActorMaterializerSettings, ActorMaterializer } +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } +import akka.stream._ import akka.stream.testkit.Utils.assertAllStagesStopped + import scala.concurrent._ import scala.concurrent.duration._ import akka.stream.testkit.{ StreamSpec, TestPublisher } @@ -165,5 +167,31 @@ class FlowFlattenMergeSpec extends StreamSpec { elems should ===((0 until 1000).toSet) } + val attributesSource = Source.fromGraph( + new GraphStage[SourceShape[Attributes]] { + val out = Outlet[Attributes]("AttributesSource.out") + override val shape: SourceShape[Attributes] = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = { + push(out, inheritedAttributes) + completeStage() + } + setHandler(out, this) + } + } + ) + + "propagate attributes to inner streams" in assertAllStagesStopped { + val f = Source.single(attributesSource.addAttributes(Attributes.name("inner"))) + .flatMapMerge(1, identity) + .addAttributes(Attributes.name("outer")) + .runWith(Sink.head) + + val attributes = Await.result(f, 3.seconds).attributeList + attributes should contain(Attributes.Name("inner")) + attributes should contain(Attributes.Name("outer")) + attributes.indexOf(Attributes.Name("outer")) < attributes.indexOf(Attributes.Name("inner")) should be(true) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 4467c21944..2d9a80287b 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -26,6 +26,13 @@ abstract class Materializer { */ def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat + /** + * This method interprets the given Flow description and creates the running + * stream using an explicitly provided [[Attributes]] as top level attributes. The result can be highly + * implementation specific, ranging from local actor chains to remote-deployed processing networks. + */ + def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat + /** * Running a flow graph will require execution resources, as will computations * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] @@ -62,6 +69,9 @@ private[akka] object NoMaterializer extends Materializer { throw new UnsupportedOperationException("NoMaterializer cannot be named") override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = throw new UnsupportedOperationException("NoMaterializer cannot materialize") + override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = + throw new UnsupportedOperationException("NoMaterializer cannot materialize") + override def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException("NoMaterializer does not provide an ExecutionContext") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 28e0191c26..41436c352d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -39,6 +39,14 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { _runnableGraph: Graph[ClosedShape, Mat], subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat + /** + * INTERNAL API + */ + def materialize[Mat]( + _runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell ⇒ ActorRef, + initialAttributes: Attributes): Mat + /** * INTERNAL API */ @@ -109,7 +117,7 @@ private[akka] case class ActorMaterializerImpl( private[this] def createFlowName(): String = flowNames.next() - private val initialAttributes = Attributes( + private val defaultInitialAttributes = Attributes( Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: ActorAttributes.Dispatcher(settings.dispatcher) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: @@ -135,11 +143,19 @@ private[akka] case class ActorMaterializerImpl( system.scheduler.scheduleOnce(delay, task)(executionContext) override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = - materialize(_runnableGraph, null) + materialize(_runnableGraph, null, defaultInitialAttributes) + + override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = + materialize(_runnableGraph, null, initialAttributes) + + override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], subflowFuser: (GraphInterpreterShell) ⇒ ActorRef): Mat = + materialize(_runnableGraph, subflowFuser, defaultInitialAttributes) override def materialize[Mat]( - _runnableGraph: Graph[ClosedShape, Mat], - subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat = { + _runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell ⇒ ActorRef, + initialAttributes: Attributes + ): Mat = { val runnableGraph = if (settings.autoFusing) Fusing.aggressive(_runnableGraph) else _runnableGraph @@ -255,6 +271,9 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) + override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = + delegate.materialize(runnable, registerShell, initialAttributes) + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 1ce1a03a2a..68ed324447 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -31,7 +31,7 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr override def initialAttributes = DefaultAttributes.flattenMerge override val shape = FlowShape(in, out) - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + override def createLogic(enclosingAttributes: Attributes) = new GraphStageLogic(shape) { var sources = Set.empty[SubSinkInlet[T]] def activeSources = sources.size @@ -82,7 +82,8 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr }) sinkIn.pull() sources += sinkIn - Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) + val graph = Source.fromGraph(source).to(sinkIn.sink) + interpreter.subFusingMaterializer.materialize(graph, initialAttributes = enclosingAttributes) } def removeSource(src: SubSinkInlet[T]): Unit = { diff --git a/project/MiMa.scala b/project/MiMa.scala index 82e6f32c5e..876f7a68a2 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1016,6 +1016,9 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.VirtualPathContainer.this"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteSystemDaemon.this") + ), + "2.4.12" -> Seq( + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.materialize") ) ) }