diff --git a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes index 479c88973b..3e5be78c0e 100644 --- a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes @@ -32,3 +32,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.Inbound # #22333 Disable Java serialization ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.ThrowableSupport.stackTraceElementBuilder") + +# #27455 Bind to arbitrary port in Artery TCP +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.*.runInboundStreams") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.*.runInboundStreams") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 1d2345d444..b791a05add 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -454,20 +454,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp) } else settings.Canonical.Port - val bindPort = if (settings.Bind.Port == 0) { - if (settings.Canonical.Port == 0) port // canonical and bind ports are zero. Use random port for both - else ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp) - } else settings.Bind.Port - _localAddress = UniqueAddress( Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), AddressUidExtension(system).longAddressUid) _addresses = Set(_localAddress.address) - _bindAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, bindPort), - AddressUidExtension(system).longAddressUid) - // TODO: This probably needs to be a global value instead of an event as events might rotate out of the log topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) @@ -478,7 +469,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr messageDispatcher = new MessageDispatcher(system, provider) topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) - runInboundStreams() + val boundPort = runInboundStreams() + _bindAddress = UniqueAddress( + Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), + AddressUidExtension(system).longAddressUid) + topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData) startRemoveQuarantinedAssociationTask() @@ -496,12 +491,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr bindAddress.address, localAddress.uid) } - } protected def startTransport(): Unit - protected def runInboundStreams(): Unit + protected def runInboundStreams(): Int private def startRemoveQuarantinedAssociationTask(): Unit = { val removeAfter = settings.Advanced.RemoveQuarantinedAssociationAfter diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index 52f095b092..17e437b2bf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -68,7 +68,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro private val taskRunner = new TaskRunner(system, settings.Advanced.Aeron.IdleCpuLevel) - private def inboundChannel = s"aeron:udp?endpoint=${bindAddress.address.host.get}:${bindAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" override protected def startTransport(): Unit = { @@ -301,7 +300,10 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro createFlightRecorderEventSink())) } - private def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] = + private def aeronSource( + streamId: Int, + pool: EnvelopeBufferPool, + inboundChannel: String): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] = Source.fromGraph( new AeronSource( inboundChannel, @@ -317,21 +319,42 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro settings.Advanced.Aeron.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels else 50 * settings.Advanced.Aeron.IdleCpuLevel - 240 - override protected def runInboundStreams(): Unit = { - runInboundControlStream() - runInboundOrdinaryMessagesStream() + override protected def runInboundStreams(): Int = { + val bindPort = + (settings.Canonical.Port, settings.Bind.Port) match { + case (0, 0) => + localAddress.address.port match { + case Some(n) if n != 0 => n + case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) + } + case (0, n) => n + case (_, 0) => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) + case (_, n) => n + } + if (settings.Bind.Port == 0) + localAddress.address.port match { + case Some(n) if n != 0 => n + case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) + } else settings.Bind.Port + + val inboundChannel = s"aeron:udp?endpoint=${settings.Bind.Hostname}:$bindPort" + + runInboundControlStream(inboundChannel) + runInboundOrdinaryMessagesStream(inboundChannel) if (largeMessageChannelEnabled) { - runInboundLargeMessagesStream() + runInboundLargeMessagesStream(inboundChannel) } blockUntilChannelActive() + + bindPort } - private def runInboundControlStream(): Unit = { + private def runInboundControlStream(inboundChannel: String): Unit = { if (isShutdown) throw ShuttingDown val (resourceLife, ctrl, completed) = - aeronSource(ControlStreamId, envelopeBufferPool) + aeronSource(ControlStreamId, envelopeBufferPool, inboundChannel) .via(inboundFlow(settings, NoInboundCompressions)) .toMat(inboundControlSink)({ case (a, (c, d)) => (a, c, d) }) .run()(controlMaterializer) @@ -339,15 +362,15 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro attachControlMessageObserver(ctrl) updateStreamMatValues(ControlStreamId, resourceLife, completed) - attachInboundStreamRestart("Inbound control stream", completed, () => runInboundControlStream()) + attachInboundStreamRestart("Inbound control stream", completed, () => runInboundControlStream(inboundChannel)) } - private def runInboundOrdinaryMessagesStream(): Unit = { + private def runInboundOrdinaryMessagesStream(inboundChannel: String): Unit = { if (isShutdown) throw ShuttingDown val (resourceLife, inboundCompressionAccess, completed) = if (inboundLanes == 1) { - aeronSource(OrdinaryStreamId, envelopeBufferPool) + aeronSource(OrdinaryStreamId, envelopeBufferPool, inboundChannel) .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) .toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) => (a, b, c) }) .run()(materializer) @@ -355,7 +378,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro } else { val laneKillSwitch = KillSwitches.shared("laneKillSwitch") val laneSource: Source[InboundEnvelope, (AeronLifecycle, InboundCompressionAccess)] = - aeronSource(OrdinaryStreamId, envelopeBufferPool) + aeronSource(OrdinaryStreamId, envelopeBufferPool, inboundChannel) .via(laneKillSwitch.flow) .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) .via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool))) @@ -395,19 +418,25 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro setInboundCompressionAccess(inboundCompressionAccess) updateStreamMatValues(OrdinaryStreamId, resourceLife, completed) - attachInboundStreamRestart("Inbound message stream", completed, () => runInboundOrdinaryMessagesStream()) + attachInboundStreamRestart( + "Inbound message stream", + completed, + () => runInboundOrdinaryMessagesStream(inboundChannel)) } - private def runInboundLargeMessagesStream(): Unit = { + private def runInboundLargeMessagesStream(inboundChannel: String): Unit = { if (isShutdown) throw ShuttingDown - val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool) + val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool, inboundChannel) .via(inboundLargeFlow(settings)) .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) .run()(materializer) updateStreamMatValues(LargeStreamId, resourceLife, completed) - attachInboundStreamRestart("Inbound large message stream", completed, () => runInboundLargeMessagesStream()) + attachInboundStreamRestart( + "Inbound large message stream", + completed, + () => runInboundLargeMessagesStream(inboundChannel)) } private def updateStreamMatValues( diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index efecfd786f..8294e36036 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -79,7 +79,7 @@ private[remote] class ArteryTcpTransport( @volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch") // may change when inbound streams are restarted @volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None - @volatile private var serverBinding: Option[Future[ServerBinding]] = None + @volatile private var serverBinding: Option[ServerBinding] = None private val sslEngineProvider: OptionVal[SSLEngineProvider] = if (tlsEnabled) { @@ -201,8 +201,7 @@ private[remote] class ArteryTcpTransport( .toMat(Sink.ignore)(Keep.right) } - override protected def runInboundStreams(): Unit = { - + override protected def runInboundStreams(): Int = { // Design note: The design of how to run the inbound streams are influenced by the original design // for the Aeron streams, and there we can only have one single inbound since everything comes in // via the single AeronSource. @@ -270,8 +269,13 @@ private[remote] class ArteryTcpTransport( .map(_ => ByteString.empty) // make it a Flow[ByteString] again } - val bindHost = bindAddress.address.host.get - val bindPort = bindAddress.address.port.get + val bindHost = settings.Bind.Hostname + val bindPort = + if (settings.Bind.Port == 0 && settings.Canonical.Port == 0) + localAddress.address.port match { + case Some(n) => n + case _ => 0 + } else settings.Bind.Port val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = if (tlsEnabled) { @@ -305,9 +309,9 @@ private[remote] class ArteryTcpTransport( }(ExecutionContexts.sameThreadExecutionContext) // only on initial startup, when ActorSystem is starting - Await.result(binding, settings.Bind.BindTimeout) - afr.loFreq(TcpInbound_Bound, s"$bindHost:$bindPort") - Some(binding) + val b = Await.result(binding, settings.Bind.BindTimeout) + afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}") + Some(b) case s @ Some(_) => // already bound, when restarting s @@ -333,6 +337,8 @@ private[remote] class ArteryTcpTransport( } attachInboundStreamRestart("Inbound streams", completed, restart) + + serverBinding.get.localAddress.getPort } private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { @@ -454,8 +460,7 @@ private[remote] class ArteryTcpTransport( case Some(binding) => implicit val ec = system.dispatchers.internalDispatcher for { - b <- binding - _ <- b.unbind() + _ <- binding.unbind() } yield { topLevelFlightRecorder.loFreq( TcpInbound_Bound,