diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala new file mode 100644 index 0000000000..ba43434354 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceFromPublisherSpec.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.actor.ActorSystem +import akka.stream.Attributes +import akka.stream.testkit.TestPublisher +import akka.testkit.TestKit +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpecLike + +class SourceFromPublisherSpec + extends TestKit(ActorSystem("source-from-publisher-spec")) + with AsyncWordSpecLike + with Matchers { + + "Source.fromPublisher" should { + // https://github.com/akka/akka/issues/30076 + "consider 'inputBuffer' attributes in a correct way" in { + val publisher = TestPublisher.probe[Int]() + Source.fromPublisher(publisher).addAttributes(Attributes.inputBuffer(1, 2)).runWith(Sink.ignore) + publisher.expectRequest() should ===(2L) + } + } +} diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes new file mode 100644 index 0000000000..98158cb24e --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/issue-30076-added-extra-parameter.excludes @@ -0,0 +1,6 @@ +# disable compatibility check for @InternalApi and @InternalStableApi. +# Added extra parameter to PhaseIsland#takePublisher, so corresponding all subclasses are failing with +# backward compatibility check. Disabling compatibility check. +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*takePublisher") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.IslandTracking.wireOut") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.PhaseIsland.takePublisher") diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 9c0f1dbb2d..7d7a0eb495 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -290,14 +290,18 @@ private final case class SavedIslandData( if (Debug) println(s" cross island forward wiring from port ${forwardWire.from} wired to local slot = $localInSlot") val publisher = forwardWire.phase.createPublisher(forwardWire.from, forwardWire.outStage) - currentPhase.takePublisher(localInSlot, publisher) + currentPhase.takePublisher(localInSlot, publisher, attributes) } } currentGlobalOffset += 1 } - @InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = { + @InternalApi private[akka] def wireOut( + out: OutPort, + absoluteOffset: Int, + logic: Any, + attributes: Attributes): Unit = { if (Debug) println(s" wiring $out to absolute = $absoluteOffset") // <-------- backwards, visited stuff @@ -338,7 +342,7 @@ private final case class SavedIslandData( } else { if (Debug) println(s" cross-island wiring to local slot $localInSlot in target island") val publisher = currentPhase.createPublisher(out, logic) - targetSegment.phase.takePublisher(localInSlot, publisher) + targetSegment.phase.takePublisher(localInSlot, publisher, attributes) } } } else { @@ -495,7 +499,8 @@ private final case class SavedIslandData( current match { case MaterializeAtomic(mod, outToSlot) => if (Debug) println(s"materializing module: $mod") - val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast) + val stageAttributes = attributesStack.getLast + val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, stageAttributes) val logic = matAndStage._1 val matValue = matAndStage._2 if (Debug) println(s" materialized value is $matValue") @@ -504,7 +509,7 @@ private final case class SavedIslandData( val stageGlobalOffset = islandTracking.getCurrentOffset wireInlets(islandTracking, mod, logic) - wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot) + wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot, stageAttributes) if (Debug) println(s"PUSH: $matValue => $matValueStack") @@ -583,7 +588,8 @@ private final case class SavedIslandData( mod: StreamLayout.AtomicModule[Shape, Any], logic: Any, stageGlobalOffset: Int, - outToSlot: Array[Int]): Unit = { + outToSlot: Array[Int], + stageAttributes: Attributes): Unit = { val outlets = mod.shape.outlets if (outlets.nonEmpty) { if (Shape.hasOnePort(outlets)) { @@ -591,14 +597,14 @@ private final case class SavedIslandData( val out = outlets.head val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") - islandTracking.wireOut(out, absoluteTargetSlot, logic) + islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes) } else { val outs = outlets.iterator while (outs.hasNext) { val out = outs.next() val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") - islandTracking.wireOut(out, absoluteTargetSlot, logic) + islandTracking.wireOut(out, absoluteTargetSlot, logic, stageAttributes) } } } @@ -659,7 +665,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: M): Publisher[Any] @InternalStableApi - def takePublisher(slot: Int, publisher: Publisher[Any]): Unit + def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit def onIslandReady(): Unit @@ -767,9 +773,9 @@ private final case class SavedIslandData( boundary.publisher } - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { val connection = conn(slot) - val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max + val bufferSize = publisherAttributes.mandatoryAttribute[InputBuffer].max val boundary = new BatchingActorInputBoundary(bufferSize, shell, publisher, "publisher.in") logics.add(boundary) @@ -866,7 +872,7 @@ private final case class SavedIslandData( override def createPublisher(out: OutPort, logic: Publisher[Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = throw new UnsupportedOperationException("A Source cannot take a Publisher") override def onIslandReady(): Unit = () @@ -903,7 +909,7 @@ private final case class SavedIslandData( throw new UnsupportedOperationException("A Sink cannot create a Publisher") } - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = { subscriberOrVirtualPublisher match { case v: VirtualPublisher[_] => v.registerPublisher(publisher) case s: Subscriber[Any] @unchecked => publisher.subscribe(s) @@ -936,7 +942,8 @@ private final case class SavedIslandData( override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = () override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor) + override def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = + publisher.subscribe(processor) override def onIslandReady(): Unit = () } @@ -978,7 +985,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] = publishers(out.id) - def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = + def takePublisher(slot: Int, publisher: Publisher[Any], publisherAttributes: Attributes): Unit = publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot)) def onIslandReady(): Unit = ()