diff --git a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala index 9b459bc41d..ca6d265661 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala @@ -12,6 +12,9 @@ import akka.annotation.InternalApi @InternalApi private[akka] trait ContextPropagation { def suspendContext(): Unit def resumeContext(): Unit + def currentContext(): AnyRef + def resumeContext(context: AnyRef): Unit + def isEnabled: Boolean } /** @@ -28,4 +31,7 @@ import akka.annotation.InternalApi private[akka] final class ContextPropagationImpl extends ContextPropagation { def suspendContext(): Unit = () def resumeContext(): Unit = () + def currentContext(): AnyRef = null + def resumeContext(context: AnyRef): Unit = () + def isEnabled: Boolean = false } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index b43fe90d64..eef707ec41 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -776,8 +776,10 @@ private final case class SavedIslandData( override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { val connection = conn(slot) val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max - val boundary = + val boundary = { + //@YG it was: val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in") + } logics.add(boundary) boundary.stageId = logics.size() - 1 boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary)