ContextPropagation enhancement

Improve flexibility of ContextPropagation to support custom stream stages with buffers
This commit is contained in:
Yury Gribkov 2021-08-09 13:03:35 -07:00
parent 6175523737
commit eb0ca25df6
2 changed files with 9 additions and 1 deletions

View file

@ -12,6 +12,9 @@ import akka.annotation.InternalApi
@InternalApi private[akka] trait ContextPropagation {
def suspendContext(): Unit
def resumeContext(): Unit
def currentContext(): AnyRef
def resumeContext(context: AnyRef): Unit
def isEnabled: Boolean
}
/**
@ -28,4 +31,7 @@ import akka.annotation.InternalApi
private[akka] final class ContextPropagationImpl extends ContextPropagation {
def suspendContext(): Unit = ()
def resumeContext(): Unit = ()
def currentContext(): AnyRef = null
def resumeContext(context: AnyRef): Unit = ()
def isEnabled: Boolean = false
}

View file

@ -776,8 +776,10 @@ private final case class SavedIslandData(
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = {
val connection = conn(slot)
val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max
val boundary =
val boundary = {
//@YG it was: val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max
new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in")
}
logics.add(boundary)
boundary.stageId = logics.size() - 1
boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary)