Source.fromPublisher does not pick up attributes fix #30076 (#30195)

This commit is contained in:
Seeta Ramayya 2021-06-01 10:44:40 +02:00 committed by GitHub
parent 364cb06905
commit a887c63f8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 15 deletions

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.testkit.TestPublisher
import akka.testkit.TestKit
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpecLike
class SourceFromPublisherSpec
extends TestKit(ActorSystem("source-from-publisher-spec"))
with AsyncWordSpecLike
with Matchers {
"Source.fromPublisher" should {
// https://github.com/akka/akka/issues/30076
"consider 'inputBuffer' attributes in a correct way" in {
val publisher = TestPublisher.probe[Int]()
Source.fromPublisher(publisher).addAttributes(Attributes.inputBuffer(1, 2)).runWith(Sink.ignore)
publisher.expectRequest() should ===(2L)
}
}
}

View file

@ -0,0 +1,6 @@
# disable compatibility check for @InternalApi and @InternalStableApi.
# Added extra parameter to PhaseIsland#takePublisher, so corresponding all subclasses are failing with
# backward compatibility check. Disabling compatibility check.
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher")

View file

@ -290,14 +290,18 @@ private final case class SavedIslandData(
if (Debug)
println(s" cross island forward wiring from port ${forwardWire.from} wired to local slot = $localInSlot")
val publisher = forwardWire.phase.createPublisher(forwardWire.from, forwardWire.outStage)
currentPhase.takePublisher(localInSlot, publisher)
currentPhase.takePublisher(localInSlot, publisher, attributes)
}
}
currentGlobalOffset += 1
}
@InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = {
@InternalApi private[akka] def wireOut(
out: OutPort,
absoluteOffset: Int,
logic: Any,
attributes: Attributes): Unit = {
if (Debug) println(s" wiring $out to absolute = $absoluteOffset")
// <-------- backwards, visited stuff
@ -338,7 +342,7 @@ private final case class SavedIslandData(
} else {
if (Debug) println(s" cross-island wiring to local slot $localInSlot in target island")
val publisher = currentPhase.createPublisher(out, logic)
targetSegment.phase.takePublisher(localInSlot, publisher)
targetSegment.phase.takePublisher(localInSlot, publisher, attributes)
}
}
} else {
@ -495,7 +499,8 @@ private final case class SavedIslandData(
current match {
case MaterializeAtomic(mod, outToSlot) =>
if (Debug) println(s"materializing module: $mod")
val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast)
val stageAttributes = attributesStack.getLast
val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, stageAttributes)
val logic = matAndStage._1
val matValue = matAndStage._2
if (Debug) println(s" materialized value is $matValue")
@ -504,7 +509,7 @@ private final case class SavedIslandData(
val stageGlobalOffset = islandTracking.getCurrentOffset
wireInlets(islandTracking, mod, logic)
wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot)
wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot, stageAttributes)
if (Debug) println(s"PUSH: $matValue => $matValueStack")
@ -583,7 +588,8 @@ private final case class SavedIslandData(
mod: StreamLayout.AtomicModule[Shape, Any],
logic: Any,
stageGlobalOffset: Int,
outToSlot: Array[Int]): Unit = {
outToSlot: Array[Int],
stageAttributes: Attributes): Unit = {
val outlets = mod.shape.outlets
if (outlets.nonEmpty) {
if (Shape.hasOnePort(outlets)) {
@ -591,14 +597,14 @@ private final case class SavedIslandData(
val out = outlets.head
val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id)
if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}")
islandTracking.wireOut(out, absoluteTargetSlot, logic)
islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes)
} else {
val outs = outlets.iterator
while (outs.hasNext) {
val out = outs.next()
val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id)
if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}")
islandTracking.wireOut(out, absoluteTargetSlot, logic)
islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes)
}
}
}
@ -659,7 +665,7 @@ private final case class SavedIslandData(
def createPublisher(out: OutPort, logic: M): Publisher[Any]
@InternalStableApi
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit
def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit
def onIslandReady(): Unit
@ -767,9 +773,9 @@ private final case class SavedIslandData(
boundary.publisher
}
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = {
val connection = conn(slot)
val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max
val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max
val boundary =
new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in")
logics.add(boundary)
@ -866,7 +872,7 @@ private final case class SavedIslandData(
override def createPublisher(out: OutPort, logic: Publisher[Any]): Publisher[Any] = logic
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit =
override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit =
throw new UnsupportedOperationException("A Source cannot take a Publisher")
override def onIslandReady(): Unit = ()
@ -903,7 +909,7 @@ private final case class SavedIslandData(
throw new UnsupportedOperationException("A Sink cannot create a Publisher")
}
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = {
subscriberOrVirtualPublisher match {
case v: VirtualPublisher[_] => v.registerPublisher(publisher)
case s: Subscriber[Any] @unchecked => publisher.subscribe(s)
@ -936,7 +942,8 @@ private final case class SavedIslandData(
override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = ()
override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor)
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit =
publisher.subscribe(processor)
override def onIslandReady(): Unit = ()
}
@ -978,7 +985,7 @@ private final case class SavedIslandData(
def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] =
publishers(out.id)
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit =
def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit =
publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot))
def onIslandReady(): Unit = ()