diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index 9f912d5624..8c508ed4f1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -5,18 +5,18 @@ package akka.stream.scaladsl import java.util.concurrent.{ CompletionStage, TimeUnit } - import scala.annotation.nowarn import com.typesafe.config.ConfigFactory - import akka.{ Done, NotUsed } import akka.actor.ActorSystem import akka.dispatch.Dispatchers +import akka.stream.ActorAttributes.Dispatcher import akka.stream._ import akka.stream.Attributes._ import akka.stream.javadsl import akka.stream.stage._ +import akka.stream.snapshot.MaterializerState import akka.stream.testkit._ import akka.testkit.TestKit @@ -355,6 +355,28 @@ class AttributesSpec dispatcher should startWith("AttributesSpec-my-dispatcher") } + + // We reverted fix #30076 for this since it had too many side effects, we need something specifically for fromPublisher() + "get input buffer size from surrounding .addAttributes for Source.fromPublisher" in pendingUntilFixed { + val materializer = Materializer(system) // for isolation + try { + val pub = TestPublisher.probe() + Source.fromPublisher(pub).withAttributes(Attributes.inputBuffer(1, 1)).run()(materializer) + + val streamSnapshot = awaitAssert { + val snapshot = MaterializerState.streamSnapshots(materializer).futureValue + snapshot should have size (1) // just the one island in this case + snapshot.head + } + + val logics = streamSnapshot.activeInterpreters.head.logics + val inputBoundary = logics.find(_.label.startsWith("BatchingActorInputBoundary")).get + inputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot + + } finally { + materializer.shutdown() + } + } } "attributes on a Flow" must { @@ -404,6 +426,117 @@ class AttributesSpec firstWhatever shouldBe Some(WhateverAttribute("replaced")) } + "get input buffer size from surrounding .addAttributes (closest)" in { + val materializer = Materializer(system) // for isolation + try { + val (sourcePromise, complete) = Source.maybe + .viaMat( + Flow[Int] + .map { n => + // something else than identity so it's not optimized away + n + } + .async(Dispatchers.DefaultBlockingDispatcherId) + .addAttributes(Attributes.inputBuffer(1, 1)))(Keep.left) + .toMat(Sink.ignore)(Keep.both) + .run()(materializer) + + val snapshot = awaitAssert { + val snapshot = MaterializerState.streamSnapshots(materializer).futureValue + snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default + snapshot + } + + val islandByDispatcher = + snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher]) + + val logicsOnBlocking = + islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics + val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get + blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot + + sourcePromise.success(None) + complete.futureValue // block until stream completes + } finally { + materializer.shutdown() + } + } + + "get input buffer size from surrounding .addAttributes (wrapping)" in { + val materializer = Materializer(system) // for isolation + try { + val (sourcePromise, complete) = Source.maybe + .viaMat(Flow[Int] + .map { n => + // something else than identity so it's not optimized away + n + } + .async(Dispatchers.DefaultBlockingDispatcherId))(Keep.left) + .addAttributes(Attributes.inputBuffer(1, 1)) + .toMat(Sink.ignore)(Keep.both) + .run()(SystemMaterializer(system).materializer) + + val snapshot = awaitAssert { + val snapshot = MaterializerState.streamSnapshots(system).futureValue + snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default + snapshot + } + + val islandByDispatcher = + snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher]) + + val logicsOnBlocking = + islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics + val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get + blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot + + sourcePromise.success(None) + complete.futureValue // block until stream completes + } finally { + materializer.shutdown() + } + } + + "get input buffer size from async(dispatcher, inputBufferSize)" in { + val materializer = Materializer(system) // for isolation + try { + val (sourcePromise, complete) = Source.maybe + .viaMat(Flow[Int] + .map { n => + // something else than identity so it's not optimized away + n + } + .async(Dispatchers.DefaultBlockingDispatcherId, 1))(Keep.left) + .toMat(Sink.ignore)(Keep.both) + .run()(SystemMaterializer(system).materializer) + + val snapshot = awaitAssert { + val snapshot = MaterializerState.streamSnapshots(system).futureValue + snapshot should have size (2) // two stream "islands", one on blocking dispatcher and one on default + snapshot + } + + val islandByDispatcher = + snapshot.groupBy(_.activeInterpreters.head.logics.head.attributes.mandatoryAttribute[Dispatcher]) + + val logicsOnBlocking = + islandByDispatcher(Dispatcher(Dispatchers.DefaultBlockingDispatcherId)).head.activeInterpreters.head.logics + val blockingInputBoundary = logicsOnBlocking.find(_.label.startsWith("BatchingActorInputBoundary")).get + blockingInputBoundary.label should include("fill=0/1,") // dodgy but see no other way to inspect from snapshot + + println(blockingInputBoundary) + /* snapshot.foreach(streamSnapshot => + println("RUNNING:\n\t" + + streamSnapshot.activeInterpreters.head.logics.map(l => l.label -> l.attributes).mkString("\n\t")) + ) */ + + sourcePromise.success(None) + complete.futureValue // block until stream completes + } finally { + materializer.shutdown() + } + } + } "attributes on a Sink" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala index c7a83024ab..76ee7f9412 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala @@ -18,8 +18,8 @@ class SourceFromPublisherSpec with Matchers { "Source.fromPublisher" should { - // https://github.com/akka/akka/issues/30076 - "consider 'inputBuffer' attributes in a correct way" in { + // https://github.com/akka/akka/pull/31129 + "consider 'inputBuffer' attributes in a correct way" in pendingUntilFixed { val publisher = TestPublisher.probe[Int]() Source.fromPublisher(publisher).addAttributes(Attributes.inputBuffer(1, 2)).runWith(Sink.ignore) publisher.expectRequest() should ===(2L) diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-30076-revert.excludes similarity index 54% rename from akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes rename to akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-30076-revert.excludes index 98158cb24e..2a1670fa7d 100644 --- a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes +++ b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/issue-30076-revert.excludes @@ -1,6 +1,4 @@ -# disable compatibility check for @InternalApi and @InternalStableApi. -# Added extra parameter to PhaseIsland#takePublisher, so corresponding all subclasses are failing with -# backward compatibility check. Disabling compatibility check. +# internal api changes reverted ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher") diff --git a/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/issue-30076-revert.excludes b/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/issue-30076-revert.excludes new file mode 100644 index 0000000000..86e2d9233d --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.19.backwards.excludes/issue-30076-revert.excludes @@ -0,0 +1,4 @@ +# internal api changes reverted +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 56b9beb867..bc41af3e91 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -290,18 +290,14 @@ private final case class SavedIslandData( if (Debug) println(s" cross island forward wiring from port ${forwardWire.from} wired to local slot = $localInSlot") val publisher = forwardWire.phase.createPublisher(forwardWire.from, forwardWire.outStage) - currentPhase.takePublisher(localInSlot, publisher, attributes) + currentPhase.takePublisher(localInSlot, publisher) } } currentGlobalOffset += 1 } - @InternalApi private[akka] def wireOut( - out: OutPort, - absoluteOffset: Int, - logic: Any, - attributes: Attributes): Unit = { + @InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = { if (Debug) println(s" wiring $out to absolute = $absoluteOffset") // <-------- backwards, visited stuff @@ -342,7 +338,7 @@ private final case class SavedIslandData( } else { if (Debug) println(s" cross-island wiring to local slot $localInSlot in target island") val publisher = currentPhase.createPublisher(out, logic) - targetSegment.phase.takePublisher(localInSlot, publisher, attributes) + targetSegment.phase.takePublisher(localInSlot, publisher) } } } else { @@ -499,8 +495,7 @@ private final case class SavedIslandData( current match { case MaterializeAtomic(mod, outToSlot) => if (Debug) println(s"materializing module: $mod") - val stageAttributes = attributesStack.getLast - val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, stageAttributes) + val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast) val logic = matAndStage._1 val matValue = matAndStage._2 if (Debug) println(s" materialized value is $matValue") @@ -509,7 +504,7 @@ private final case class SavedIslandData( val stageGlobalOffset = islandTracking.getCurrentOffset wireInlets(islandTracking, mod, logic) - wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot, stageAttributes) + wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot) if (Debug) println(s"PUSH: $matValue => $matValueStack") @@ -588,8 +583,7 @@ private final case class SavedIslandData( mod: StreamLayout.AtomicModule[Shape, Any], logic: Any, stageGlobalOffset: Int, - outToSlot: Array[Int], - stageAttributes: Attributes): Unit = { + outToSlot: Array[Int]): Unit = { val outlets = mod.shape.outlets if (outlets.nonEmpty) { if (Shape.hasOnePort(outlets)) { @@ -597,14 +591,14 @@ private final case class SavedIslandData( val out = outlets.head val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") - islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes) + islandTracking.wireOut(out, absoluteTargetSlot, logic) } else { val outs = outlets.iterator while (outs.hasNext) { val out = outs.next() val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") - islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes) + islandTracking.wireOut(out, absoluteTargetSlot, logic) } } } @@ -665,7 +659,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: M): Publisher[Any] @InternalStableApi - def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit + def takePublisher(slot: Int, publisher: Publisher[Any]): Unit def onIslandReady(): Unit @@ -773,9 +767,9 @@ private final case class SavedIslandData( boundary.publisher } - override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { val connection = conn(slot) - val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max + val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max val boundary = new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in") logics.add(boundary) @@ -872,7 +866,7 @@ private final case class SavedIslandData( override def createPublisher(out: OutPort, logic: Publisher[Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = + override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = throw new UnsupportedOperationException("A Source cannot take a Publisher") override def onIslandReady(): Unit = () @@ -909,7 +903,7 @@ private final case class SavedIslandData( throw new UnsupportedOperationException("A Sink cannot create a Publisher") } - override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { subscriberOrVirtualPublisher match { case v: VirtualPublisher[_] => v.registerPublisher(publisher) case s: Subscriber[Any] @unchecked => publisher.subscribe(s) @@ -942,8 +936,7 @@ private final case class SavedIslandData( override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = () override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = - publisher.subscribe(processor) + override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor) override def onIslandReady(): Unit = () } @@ -985,7 +978,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] = publishers(out.id) - def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = + def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot)) def onIslandReady(): Unit = () diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 930d34f466..b45db7e4ef 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -17,7 +17,6 @@ import akka.stream.Attributes._ // reusable common attributes val IODispatcher = ActorAttributes.IODispatcher val inputBufferOne = inputBuffer(initial = 1, max = 1) - val inputBufferZero = inputBuffer(initial = 0, max = 0) // stage specific default attributes val fused = name("fused") @@ -136,7 +135,7 @@ import akka.stream.Attributes._ val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") - val neverSink = name("neverSink") and inputBufferZero + val neverSink = name("neverSink") val actorRefSink = name("actorRefSink") val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink") val actorSubscriberSink = name("actorSubscriberSink")