From 191bd0fe4170add3dff36e7b8ce1363f62549f08 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 17 Mar 2017 12:42:13 +0100 Subject: [PATCH] =str #22584 make subfusing work if diff materialize() is called Also: removed unecessary method with wrong name, #22350 --- .../src/main/scala/akka/cluster/ddata/ORMultiMap.scala | 1 - .../scala/akka/stream/impl/ActorMaterializerImpl.scala | 9 ++++++++- .../akka/stream/impl/PhasedFusingActorMaterializer.scala | 3 ++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index 5c7d00e907..aa1c022d95 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -22,7 +22,6 @@ object ORMultiMap { * Java API */ def create[A, B](): ORMultiMap[A, B] = empty[A, B] - def createWithDeltaDelta[A, B](): ORMultiMap[A, B] = emptyWithValueDeltas[A, B] /** * Extract the [[ORMultiMap#entries]]. diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index ba38630603..ab446e9c62 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -96,7 +96,14 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa override def executionContext: ExecutionContextExecutor = delegate.executionContext override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = - delegate.materialize(runnable) + delegate match { + case am: PhasedFusingActorMaterializer ⇒ + materialize(runnable, am.defaultInitialAttributes) + + case other ⇒ + throw new IllegalStateException(s"SubFusing only supported by [PhasedFusingActorMaterializer], " + + s"yet was used with [${other.getClass.getName}]!") + } override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat = { if (PhasedFusingActorMaterializer.Debug) println(s"Using [${getClass.getSimpleName}] to materialize [${runnable}]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 2d51c492e3..e99f162d8e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -365,7 +365,8 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff private[this] def createFlowName(): String = flowNames.next() - private val defaultInitialAttributes = { + /** INTERNAL API */ + private[akka] val defaultInitialAttributes = { val a = Attributes( Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::