From eb0ca25df672b1c8173cd347565bb2bffa389f2f Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 9 Aug 2021 13:03:35 -0700 Subject: [PATCH 1/5] ContextPropagation enhancement Improve flexibility of ContextPropagation to support custom stream stages with buffers --- .../main/scala/akka/stream/impl/ContextPropagation.scala | 6 ++++++ .../akka/stream/impl/PhasedFusingActorMaterializer.scala | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala index 9b459bc41d..ca6d265661 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala @@ -12,6 +12,9 @@ import akka.annotation.InternalApi @InternalApi private[akka] trait ContextPropagation { def suspendContext(): Unit def resumeContext(): Unit + def currentContext(): AnyRef + def resumeContext(context: AnyRef): Unit + def isEnabled: Boolean } /** @@ -28,4 +31,7 @@ import akka.annotation.InternalApi private[akka] final class ContextPropagationImpl extends ContextPropagation { def suspendContext(): Unit = () def resumeContext(): Unit = () + def currentContext(): AnyRef = null + def resumeContext(context: AnyRef): Unit = () + def isEnabled: Boolean = false } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index b43fe90d64..eef707ec41 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -776,8 +776,10 @@ private final case class SavedIslandData( override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { val connection = conn(slot) val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max - val boundary = + val boundary = { + //@YG it was: val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in") + } logics.add(boundary) boundary.stageId = logics.size() - 1 boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary) From d3153904d27595ddbf8c5f2f4b22b9a9975908e8 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 9 Aug 2021 13:33:32 -0700 Subject: [PATCH 2/5] Add mima-filters for ContextPropagation enhancements. --- .../context-propagation.backwards.excludes | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes diff --git a/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes new file mode 100644 index 0000000000..fae565fb4b --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes @@ -0,0 +1,4 @@ +# internal +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.currentContext") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.resumeContext") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.isEnabled") \ No newline at end of file From 1b7d1833fc08c90f68a9cc715594d65803bc5c04 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 9 Aug 2021 14:06:55 -0700 Subject: [PATCH 3/5] Move 2.6.11 mima filters to a proper folder --- .../pr-29086-rename-grouped.excludes | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename akka-stream/src/main/mima-filters/{2.6.11.backwards.excludes => 2.6.12.backwards.excludes}/pr-29086-rename-grouped.excludes (100%) diff --git a/akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/pr-29086-rename-grouped.excludes b/akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/pr-29086-rename-grouped.excludes similarity index 100% rename from akka-stream/src/main/mima-filters/2.6.11.backwards.excludes/pr-29086-rename-grouped.excludes rename to akka-stream/src/main/mima-filters/2.6.12.backwards.excludes/pr-29086-rename-grouped.excludes From ad86f34462940a8db1405da8144193c7d9bc34a1 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Mon, 9 Aug 2021 14:36:04 -0700 Subject: [PATCH 4/5] Clean up --- .../context-propagation.backwards.excludes | 3 +-- .../src/main/scala/akka/stream/impl/ContextPropagation.scala | 2 -- .../akka/stream/impl/PhasedFusingActorMaterializer.scala | 4 +--- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes index fae565fb4b..839f4fa6ac 100644 --- a/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes @@ -1,4 +1,3 @@ # internal ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.currentContext") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.resumeContext") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.isEnabled") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ContextPropagation.resumeContext") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala index ca6d265661..12d3ae5a6e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala @@ -14,7 +14,6 @@ import akka.annotation.InternalApi def resumeContext(): Unit def currentContext(): AnyRef def resumeContext(context: AnyRef): Unit - def isEnabled: Boolean } /** @@ -33,5 +32,4 @@ private[akka] final class ContextPropagationImpl extends ContextPropagation { def resumeContext(): Unit = () def currentContext(): AnyRef = null def resumeContext(context: AnyRef): Unit = () - def isEnabled: Boolean = false } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index eef707ec41..b43fe90d64 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -776,10 +776,8 @@ private final case class SavedIslandData( override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { val connection = conn(slot) val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max - val boundary = { - //@YG it was: val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max + val boundary = new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in") - } logics.add(boundary) boundary.stageId = logics.size() - 1 boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary) From fdbf706b3a77baff99d0f34c8abc5e09f69e2f20 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Wed, 11 Aug 2021 08:38:45 -0700 Subject: [PATCH 5/5] Make ContextPropagation public to be used from custom akka stream stage implementations outside of the akka codebase --- .../src/main/scala/akka/stream/impl/ContextPropagation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala index 12d3ae5a6e..32d9435419 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ContextPropagation.scala @@ -9,7 +9,7 @@ import akka.annotation.InternalApi /** * INTERNAL API */ -@InternalApi private[akka] trait ContextPropagation { +@InternalApi trait ContextPropagation { def suspendContext(): Unit def resumeContext(): Unit def currentContext(): AnyRef @@ -19,7 +19,7 @@ import akka.annotation.InternalApi /** * INTERNAL API */ -@InternalApi private[akka] object ContextPropagation { +@InternalApi object ContextPropagation { /** * INTERNAL API