diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index bc41af3e91..84c15a2df5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -290,7 +290,7 @@ private final case class SavedIslandData( if (Debug) println(s" cross island forward wiring from port ${forwardWire.from} wired to local slot = $localInSlot") val publisher = forwardWire.phase.createPublisher(forwardWire.from, forwardWire.outStage) - currentPhase.takePublisher(localInSlot, publisher) + currentPhase.takePublisher(localInSlot, publisher, null) } } @@ -338,7 +338,7 @@ private final case class SavedIslandData( } else { if (Debug) println(s" cross-island wiring to local slot $localInSlot in target island") val publisher = currentPhase.createPublisher(out, logic) - targetSegment.phase.takePublisher(localInSlot, publisher) + targetSegment.phase.takePublisher(localInSlot, publisher, null) } } } else { @@ -659,7 +659,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: M): Publisher[Any] @InternalStableApi - def takePublisher(slot: Int, publisher: Publisher[Any]): Unit + def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit def onIslandReady(): Unit @@ -767,7 +767,7 @@ private final case class SavedIslandData( boundary.publisher } - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = { val connection = conn(slot) val bufferSize = connection.inOwner.attributes.mandatoryAttribute[InputBuffer].max val boundary = @@ -866,7 +866,7 @@ private final case class SavedIslandData( override def createPublisher(out: OutPort, logic: Publisher[Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = throw new UnsupportedOperationException("A Source cannot take a Publisher") override def onIslandReady(): Unit = () @@ -903,7 +903,7 @@ private final case class SavedIslandData( throw new UnsupportedOperationException("A Sink cannot create a Publisher") } - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = { subscriberOrVirtualPublisher match { case v: VirtualPublisher[_] => v.registerPublisher(publisher) case s: Subscriber[Any] @unchecked => publisher.subscribe(s) @@ -936,7 +936,8 @@ private final case class SavedIslandData( override def assignPort(out: OutPort, slot: Int, logic: Processor[Any, Any]): Unit = () override def createPublisher(out: OutPort, logic: Processor[Any, Any]): Publisher[Any] = logic - override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = publisher.subscribe(processor) + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = + publisher.subscribe(processor) override def onIslandReady(): Unit = () } @@ -978,7 +979,7 @@ private final case class SavedIslandData( def createPublisher(out: OutPort, logic: NotUsed): Publisher[Any] = publishers(out.id) - def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = + override def takePublisher(slot: Int, publisher: Publisher[Any], attributes: Attributes): Unit = publisher.subscribe(FanIn.SubInput[Any](tlsActor, 1 - slot)) def onIslandReady(): Unit = ()