diff --git a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes index c04525ec71..867e69050e 100644 --- a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes @@ -37,6 +37,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.Th ProblemFilters.exclude[Problem]("akka.remote.serialization.ArteryMessageSerializer*") ProblemFilters.exclude[Problem]("akka.remote.*Formats*") -# #27455 Bind to arbitrary port in Artery TCP +# #27455, #27525 Bind to arbitrary port in Artery TCP ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.*.runInboundStreams") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.*.runInboundStreams") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.*.runInboundStreams") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.ArteryTransport.bindInboundStreams") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.autoSelectPort") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b791a05add..b2458c6b21 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -4,10 +4,7 @@ package akka.remote.artery -import java.net.InetSocketAddress -import java.nio.channels.DatagramChannel import java.nio.channels.FileChannel -import java.nio.channels.ServerSocketChannel import java.nio.file.Path import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean @@ -447,21 +444,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr startTransport() topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData) - val udp = settings.Transport == ArterySettings.AeronUpd - val port = - if (settings.Canonical.Port == 0) { - if (settings.Bind.Port != 0) settings.Bind.Port // if bind port is set, use bind port instead of random - else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp) - } else settings.Canonical.Port - - _localAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), - AddressUidExtension(system).longAddressUid) - _addresses = Set(_localAddress.address) - - // TODO: This probably needs to be a global value instead of an event as events might rotate out of the log - topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) - materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) controlMaterializer = ActorMaterializer.systemMaterializer(settings.Advanced.ControlStreamMaterializerSettings, "remoteControl", system) @@ -469,11 +451,21 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr messageDispatcher = new MessageDispatcher(system, provider) topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) - val boundPort = runInboundStreams() + val (port, boundPort) = bindInboundStreams() + + _localAddress = UniqueAddress( + Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), + AddressUidExtension(system).longAddressUid) + _addresses = Set(_localAddress.address) + _bindAddress = UniqueAddress( Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), AddressUidExtension(system).longAddressUid) + topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) + + runInboundStreams(port, boundPort) + topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData) startRemoveQuarantinedAssociationTask() @@ -495,7 +487,21 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr protected def startTransport(): Unit - protected def runInboundStreams(): Int + /** + * Bind to the ports for inbound streams. If '0' is specified, this will also select an + * arbitrary free local port. For UDP, we only select the port and leave the actual + * binding to Aeron when running the inbound stream. + * + * After calling this method the 'localAddress' and 'bindAddress' fields can be set. + */ + protected def bindInboundStreams(): (Int, Int) + + /** + * Run the inbound streams that have been previously bound. + * + * Before calling this method the 'localAddress' and 'bindAddress' should have been set. + */ + protected def runInboundStreams(port: Int, bindPort: Int): Unit private def startRemoveQuarantinedAssociationTask(): Unit = { val removeAfter = settings.Advanced.RemoveQuarantinedAssociationAfter @@ -1013,22 +1019,6 @@ private[remote] object ArteryTransport { final case class InboundStreamMatValues[LifeCycle](lifeCycle: LifeCycle, completed: Future[Done]) - def autoSelectPort(hostname: String, udp: Boolean): Int = { - if (udp) { - val socket = DatagramChannel.open().socket() - socket.bind(new InetSocketAddress(hostname, 0)) - val port = socket.getLocalPort - socket.close() - port - } else { - val socket = ServerSocketChannel.open().socket() - socket.bind(new InetSocketAddress(hostname, 0)) - val port = socket.getLocalPort - socket.close() - port - } - } - val ControlStreamId = 1 val OrdinaryStreamId = 2 val LargeStreamId = 3 diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index 17e437b2bf..b3f051d42f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -319,24 +319,21 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro settings.Advanced.Aeron.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels else 50 * settings.Advanced.Aeron.IdleCpuLevel - 240 - override protected def runInboundStreams(): Int = { - val bindPort = - (settings.Canonical.Port, settings.Bind.Port) match { - case (0, 0) => - localAddress.address.port match { - case Some(n) if n != 0 => n - case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) - } - case (0, n) => n - case (_, 0) => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) - case (_, n) => n - } - if (settings.Bind.Port == 0) - localAddress.address.port match { - case Some(n) if n != 0 => n - case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true) - } else settings.Bind.Port + override protected def bindInboundStreams(): (Int, Int) = { + (settings.Canonical.Port, settings.Bind.Port) match { + case (0, 0) => + val p = autoSelectPort(settings.Bind.Hostname) + (p, p) + case (0, _) => + (settings.Bind.Port, settings.Bind.Port) + case (_, 0) => + (settings.Canonical.Port, autoSelectPort(settings.Bind.Hostname)) + case _ => + (settings.Canonical.Port, settings.Bind.Port) + } + } + override protected def runInboundStreams(port: Int, bindPort: Int): Unit = { val inboundChannel = s"aeron:udp?endpoint=${settings.Bind.Hostname}:$bindPort" runInboundControlStream(inboundChannel) @@ -346,8 +343,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro runInboundLargeMessagesStream(inboundChannel) } blockUntilChannelActive() - - bindPort } private def runInboundControlStream(inboundChannel: String): Unit = { @@ -466,4 +461,14 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro }(system.dispatchers.internalDispatcher) } + def autoSelectPort(hostname: String): Int = { + import java.nio.channels.DatagramChannel + import java.net.InetSocketAddress + + val socket = DatagramChannel.open().socket() + socket.bind(new InetSocketAddress(hostname, 0)) + val port = socket.getLocalPort + socket.close() + port + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 8294e36036..7b539e334c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -78,8 +78,10 @@ private[remote] class ArteryTcpTransport( // may change when inbound streams are restarted @volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch") // may change when inbound streams are restarted - @volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None @volatile private var serverBinding: Option[ServerBinding] = None + private val firstConnectionFlow = Promise[Flow[ByteString, ByteString, NotUsed]]() + @volatile private var inboundConnectionFlow: Future[Flow[ByteString, ByteString, NotUsed]] = + firstConnectionFlow.future private val sslEngineProvider: OptionVal[SSLEngineProvider] = if (tlsEnabled) { @@ -201,7 +203,61 @@ private[remote] class ArteryTcpTransport( .toMat(Sink.ignore)(Keep.right) } - override protected def runInboundStreams(): Int = { + override protected def bindInboundStreams(): (Int, Int) = { + implicit val sys: ActorSystem = system + implicit val mat: Materializer = materializer + + val bindHost = settings.Bind.Hostname + val bindPort = settings.Bind.Port + + val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = + if (tlsEnabled) { + val sslProvider = sslEngineProvider.get + Tcp().bindTlsWithSSLEngine( + interface = bindHost, + port = bindPort, + createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort), + verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session))) + } else { + Tcp().bind(interface = bindHost, port = bindPort, halfClose = false) + } + + val binding = serverBinding match { + case None => + val afr = createFlightRecorderEventSink() + val binding = connectionSource + .to(Sink.foreach { connection => + afr.loFreq( + TcpInbound_Connected, + s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") + inboundConnectionFlow.map(connection.handleWith(_))(sys.dispatcher) + }) + .run() + .recoverWith { + case e => + Future.failed( + new RemoteTransportException( + s"Failed to bind TCP to [$bindHost:$bindPort] due to: " + + e.getMessage, + e)) + }(ExecutionContexts.sameThreadExecutionContext) + + // only on initial startup, when ActorSystem is starting + val b = Await.result(binding, settings.Bind.BindTimeout) + afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}") + b + case Some(binding) => + // already bound, when restarting + binding + } + serverBinding = Some(binding) + if (settings.Canonical.Port == 0) + (binding.localAddress.getPort, binding.localAddress.getPort) + else + (settings.Canonical.Port, binding.localAddress.getPort) + } + + override protected def runInboundStreams(port: Int, bindPort: Int): Unit = { // Design note: The design of how to run the inbound streams are influenced by the original design // for the Aeron streams, and there we can only have one single inbound since everything comes in // via the single AeronSource. @@ -216,9 +272,6 @@ private[remote] class ArteryTcpTransport( // have to be changed, such as compression advertisements and materialized values. Number of // inbound streams would be dynamic, and so on. - implicit val mat: Materializer = materializer - implicit val sys: ActorSystem = system - // These streams are always running, only one instance of each inbound stream, and then the inbound connections // are attached to these via a MergeHub. val (controlStream, controlStreamCompleted) = runInboundControlStream() @@ -240,7 +293,7 @@ private[remote] class ArteryTcpTransport( // decide where to attach it based on that byte. Then the streamId wouldn't have to be sent in each // frame. That was not chosen because it is more complicated to implement and might have more runtime // overhead. - inboundStream = OptionVal.Some(Sink.fromGraph(GraphDSL.create() { implicit b => + val inboundStream = Sink.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val partition = b.add(Partition[EnvelopeBuffer](3, env => { env.streamId match { @@ -254,68 +307,22 @@ private[remote] class ArteryTcpTransport( partition.out(1) ~> ordinaryMessagesStream partition.out(2) ~> largeMessagesStream SinkShape(partition.in) - })) + }) // If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down, // but other parts of the inbound streams don't have to restarted. - def inboundConnectionFlow: Flow[ByteString, ByteString, NotUsed] = { + val newInboundConnectionFlow = { // must create new Flow for each connection because of the FlightRecorder that can't be shared val afr = createFlightRecorderEventSink() Flow[ByteString] .via(inboundKillSwitch.flow) .via(new TcpFraming(afr)) - .alsoTo(inboundStream.get) + .alsoTo(inboundStream) .filter(_ => false) // don't send back anything in this TCP socket .map(_ => ByteString.empty) // make it a Flow[ByteString] again } - - val bindHost = settings.Bind.Hostname - val bindPort = - if (settings.Bind.Port == 0 && settings.Canonical.Port == 0) - localAddress.address.port match { - case Some(n) => n - case _ => 0 - } else settings.Bind.Port - - val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = - if (tlsEnabled) { - val sslProvider = sslEngineProvider.get - Tcp().bindTlsWithSSLEngine( - interface = bindHost, - port = bindPort, - createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort), - verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session))) - } else { - Tcp().bind(interface = bindHost, port = bindPort, halfClose = false) - } - - serverBinding = serverBinding match { - case None => - val afr = createFlightRecorderEventSink() - val binding = connectionSource - .to(Sink.foreach { connection => - afr.loFreq( - TcpInbound_Connected, - s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") - connection.handleWith(inboundConnectionFlow) - }) - .run() - .recoverWith { - case e => - Future.failed(new RemoteTransportException( - s"Failed to bind TCP to [${localAddress.address.host.get}:${localAddress.address.port.get}] due to: " + - e.getMessage, - e)) - }(ExecutionContexts.sameThreadExecutionContext) - - // only on initial startup, when ActorSystem is starting - val b = Await.result(binding, settings.Bind.BindTimeout) - afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}") - Some(b) - case s @ Some(_) => - // already bound, when restarting - s - } + firstConnectionFlow.trySuccess(newInboundConnectionFlow) + inboundConnectionFlow = Future.successful(newInboundConnectionFlow) // Failures in any of the inbound streams should be extremely rare, probably an unforeseen accident. // Tear down everything and start over again. Inbound streams are "stateless" so that should be fine. @@ -333,12 +340,10 @@ private[remote] class ArteryTcpTransport( _ <- if (largeMessageChannelEnabled) largeMessagesStreamCompleted.recover { case _ => Done } else Future.successful(Done) } yield Done - allStopped.foreach(_ => runInboundStreams()) + allStopped.foreach(_ => runInboundStreams(port, bindPort)) } attachInboundStreamRestart("Inbound streams", completed, restart) - - serverBinding.get.localAddress.getPort } private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = {