Revert #30195 - Source.fromPublisher does not pick up attributes fix (#31129)

* Revert "Source.fromPublisher does not pick up attributes fix #30076 (#30195)"

This reverts commit a887c63f8a.

* MiMa filters

* Add tests from #31015

* Resolving some conflicts and failures this now triggers
This commit is contained in:
Johan Andrén 2022-05-06 16:48:51 +02:00 committed by GitHub
parent e7e95f5c75
commit b7d88fba8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 31 deletions

View file

@ -5,18 +5,18 @@
package akka.stream.scaladsl
import java.util.concurrent.{ CompletionStage, TimeUnit }
import scala.annotation.nowarn
import com.typesafe.config.ConfigFactory
import akka.{ Done, NotUsed }
import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.stream.ActorAttributes.Dispatcher
import akka.stream._
import akka.stream.Attributes._
import akka.stream.javadsl
import akka.stream.stage._
import akka.stream.snapshot.MaterializerState
import akka.stream.testkit._
import akka.testkit.TestKit
@ -355,6 +355,28 @@ class AttributesSpec
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
// We reverted fix #30076 for this since it had too many side effects, we need something specifically for fromPublisher()
"get input buffer size from surrounding .addAttributes for Source.fromPublisher" in pendingUntilFixed {
val materializer = Materializer(system) // for isolation
try {
val pub = TestPublisher.probe()
Source.fromPublisher(pub).withAttributes(Attributes.inputBuffer(1, 1)).run()(materializer)
val streamSnapshot = awaitAssert {
val snapshot = MaterializerState.streamSnapshots(materializer).futureValue
snapshot should have size (1) // just the one island in this case
snapshot.head
}
val logics = streamSnapshot.activeInterpreters.head.logics
val inputBoundary = logics.find(_.label.startsWith("BatchingActorInputBoundary")).get
inputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot
} finally {
materializer.shutdown()
}
}
}
"attributes on a Flow" must {
@ -404,6 +426,117 @@ class AttributesSpec
firstWhatever shouldBe Some(WhateverAttribute("replaced"))
}
"get input buffer size from surrounding .addAttributes (closest)" in {
val materializer = Materializer(system) // for isolation
try {
val (sourcePromise, complete) = Source.maybe
.viaMat(
Flow[Int]
.map { n =>
// something else than identity so it's not optimized away
n
}
.async(Dispatchers.DefaultBlockingDispatcherId)
.addAttributes(Attributes.inputBuffer(1, 1)))(Keep.left)
.toMat(Sink.ignore)(Keep.both)
.run()(materializer)
val snapshot = awaitAssert {
val snapshot = MaterializerState.streamSnapshots(materializer).futureValue
snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default
snapshot
}
val islandByDispatcher =
snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher])
val logicsOnBlocking =
islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics
val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get
blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot
sourcePromise.success(None)
complete.futureValue // block until stream completes
} finally {
materializer.shutdown()
}
}
"get input buffer size from surrounding .addAttributes (wrapping)" in {
val materializer = Materializer(system) // for isolation
try {
val (sourcePromise, complete) = Source.maybe
.viaMat(Flow[Int]
.map { n =>
// something else than identity so it's not optimized away
n
}
.async(Dispatchers.DefaultBlockingDispatcherId))(Keep.left)
.addAttributes(Attributes.inputBuffer(1, 1))
.toMat(Sink.ignore)(Keep.both)
.run()(SystemMaterializer(system).materializer)
val snapshot = awaitAssert {
val snapshot = MaterializerState.streamSnapshots(system).futureValue
snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default
snapshot
}
val islandByDispatcher =
snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher])
val logicsOnBlocking =
islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics
val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get
blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot
sourcePromise.success(None)
complete.futureValue // block until stream completes
} finally {
materializer.shutdown()
}
}
"get input buffer size from async(dispatcher, inputBufferSize)" in {
val materializer = Materializer(system) // for isolation
try {
val (sourcePromise, complete) = Source.maybe
.viaMat(Flow[Int]
.map { n =>
// something else than identity so it's not optimized away
n
}
.async(Dispatchers.DefaultBlockingDispatcherId, 1))(Keep.left)
.toMat(Sink.ignore)(Keep.both)
.run()(SystemMaterializer(system).materializer)
val snapshot = awaitAssert {
val snapshot = MaterializerState.streamSnapshots(system).futureValue
snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default
snapshot
}
val islandByDispatcher =
snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher])
val logicsOnBlocking =
islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics
val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get
blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot
println(blockingInputBoundary)
/* snapshot.foreach(streamSnapshot =>
println("RUNNING:\n\t" +
streamSnapshot.activeInterpreters.head.logics.map(l => l.label -> l.attributes).mkString("\n\t"))
) */
sourcePromise.success(None)
complete.futureValue // block until stream completes
} finally {
materializer.shutdown()
}
}
}
"attributes on a Sink" must {

View file

@ -18,8 +18,8 @@ class SourceFromPublisherSpec
with Matchers {
"Source.fromPublisher" should {
// https://github.com/akka/akka/issues/30076
"consider 'inputBuffer' attributes in a correct way" in {
// https://github.com/akka/akka/pull/31129
"consider 'inputBuffer' attributes in a correct way" in pendingUntilFixed {
val publisher = TestPublisher.probe[Int]()
Source.fromPublisher(publisher).addAttributes(Attributes.inputBuffer(1, 2)).runWith(Sink.ignore)
publisher.expectRequest() should ===(2L)

View file

@ -1,6 +1,4 @@
# disable compatibility check for @InternalApi and @InternalStableApi.
# Added extra parameter to PhaseIsland#takePublisher, so corresponding all subclasses are failing with
# backward compatibility check. Disabling compatibility check.
# internal api changes reverted
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher")

View file

@ -0,0 +1,4 @@
# internal api changes reverted
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher")

View file

@ -290,18 +290,14 @@ private final case class SavedIslandData(
if (Debug)
println(s" cross island forward wiring from port ${forwardWire.from} wired to local slot = $localInSlot")
val publisher = forwardWire.phase.createPublisher(forwardWire.from, forwardWire.outStage)
currentPhase.takePublisher(localInSlot, publisher, attributes)
currentPhase.takePublisher(localInSlot, publisher)
}
}
currentGlobalOffset += 1
}
@InternalApi private[akka] def wireOut(
out: OutPort,
absoluteOffset: Int,
logic: Any,
attributes: Attributes): Unit = {
@InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = {
if (Debug) println(s" wiring $out to absolute = $absoluteOffset")
// <-------- backwards, visited stuff
@ -342,7 +338,7 @@ private final case class SavedIslandData(
} else {
if (Debug) println(s" cross-island wiring to local slot $localInSlot in target island")
val publisher = currentPhase.createPublisher(out, logic)
targetSegment.phase.takePublisher(localInSlot, publisher, attributes)
targetSegment.phase.takePublisher(localInSlot, publisher)
}
}
} else {
@ -499,8 +495,7 @@ private final case class SavedIslandData(
current match {
case MaterializeAtomic(mod, outToSlot) =>
if (Debug) println(s"materializing module: $mod")
val stageAttributes = attributesStack.getLast
val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, stageAttributes)
val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast)
val logic = matAndStage._1
val matValue = matAndStage._2
if (Debug) println(s" materialized value is $matValue")
@ -509,7 +504,7 @@ private final case class SavedIslandData(
val stageGlobalOffset = islandTracking.getCurrentOffset
wireInlets(islandTracking, mod, logic)
wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot, stageAttributes)
wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot)
if (Debug) println(s"PUSH: $matValue => $matValueStack")
@ -588,8 +583,7 @@ private final case class SavedIslandData(
mod: StreamLayout.AtomicModule[Shape, Any],
logic: Any,
stageGlobalOffset: Int,
outToSlot: Array[Int],
stageAttributes: Attributes): Unit = {
outToSlot: Array[Int]): Unit = {
val outlets = mod.shape.outlets
if (outlets.nonEmpty) {
if (Shape.hasOnePort(outlets)) {
@ -597,14 +591,14 @@ private final case class SavedIslandData(
val out = outlets.head
val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id)
if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}")
islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes)
islandTracking.wireOut(out, absoluteTargetSlot, logic)
} else {
val outs = outlets.iterator
while (outs.hasNext) {
val out = outs.next()
val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id)
if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}")
islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes)
islandTracking.wireOut(out, absoluteTargetSlot, logic)
}
}
}
@ -665,7 +659,7 @@ private final case class SavedIslandData(
def createPublisher(out: OutPort, logic: M): Publisher[Any]
@InternalStableApi
def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit
def onIslandReady(): Unit
@ -773,9 +767,9 @@ private final case class SavedIslandData(
boundary.publisher
}
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = {
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
val connection = conn(slot)
val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max
val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max
val boundary =
new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in")
logics.add(boundary)
@ -872,7 +866,7 @@ private final case class SavedIslandData(
override def createPublisher(out: OutPort, logic: Publisher[Any]): Publisher[Any] = logic
override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit =
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit =
throw new UnsupportedOperationException("A Source cannot take a Publisher")
override def onIslandReady(): Unit = ()
@ -909,7 +903,7 @@ private final case class SavedIslandData(
throw new UnsupportedOperationException("A Sink cannot create a Publisher")
}
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = {
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
subscriberOrVirtualPublisher match {
case v: VirtualPublisher[_] => v.registerPublisher(publisher)
case s: Subscriber[Any] @unchecked => publisher.subscribe(s)
@ -942,8 +936,7 @@ private final case class SavedIslandData(
override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = ()
override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic
override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit =
publisher.subscribe(processor)
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor)
override def onIslandReady(): Unit = ()
}
@ -985,7 +978,7 @@ private final case class SavedIslandData(
def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] =
publishers(out.id)
def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit =
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit =
publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot))
def onIslandReady(): Unit = ()

View file

@ -17,7 +17,6 @@ import akka.stream.Attributes._
// reusable common attributes
val IODispatcher = ActorAttributes.IODispatcher
val inputBufferOne = inputBuffer(initial = 1, max = 1)
val inputBufferZero = inputBuffer(initial = 0, max = 0)
// stage specific default attributes
val fused = name("fused")
@ -136,7 +135,7 @@ import akka.stream.Attributes._
val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val neverSink = name("neverSink") and inputBufferZero
val neverSink = name("neverSink")
val actorRefSink = name("actorRefSink")
val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
val actorSubscriberSink = name("actorSubscriberSink")