Fix race when binding to port '0' in artery tcp (#27525)

* Fix race when binding to port '0' in artery tcp

Splitting the 'binding' and 'starting inbound streams' seems to make
it a bit easier to follow as well
This commit is contained in:
Arnout Engelen 2019-08-27 10:26:15 +02:00 committed by GitHub
parent 3c018c044f
commit b1746d6258
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 119 additions and 116 deletions

View file

@ -37,6 +37,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.Th
ProblemFilters.exclude[Problem]("akka.remote.serialization.ArteryMessageSerializer*")
ProblemFilters.exclude[Problem]("akka.remote.*Formats*")
# #27455 Bind to arbitrary port in Artery TCP
# #27455, #27525 Bind to arbitrary port in Artery TCP
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.*.runInboundStreams")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.*.runInboundStreams")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.*.runInboundStreams")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.ArteryTransport.bindInboundStreams")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.autoSelectPort")

View file

@ -4,10 +4,7 @@
package akka.remote.artery
import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import java.nio.channels.FileChannel
import java.nio.channels.ServerSocketChannel
import java.nio.file.Path
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
@ -447,21 +444,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
startTransport()
topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData)
val udp = settings.Transport == ArterySettings.AeronUpd
val port =
if (settings.Canonical.Port == 0) {
if (settings.Bind.Port != 0) settings.Bind.Port // if bind port is set, use bind port instead of random
else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp)
} else settings.Canonical.Port
_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)
// TODO: This probably needs to be a global value instead of an event as events might rotate out of the log
topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString())
materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system)
controlMaterializer =
ActorMaterializer.systemMaterializer(settings.Advanced.ControlStreamMaterializerSettings, "remoteControl", system)
@ -469,11 +451,21 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
messageDispatcher = new MessageDispatcher(system, provider)
topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData)
val boundPort = runInboundStreams()
val (port, boundPort) = bindInboundStreams()
_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)
_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)
topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString())
runInboundStreams(port, boundPort)
topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData)
startRemoveQuarantinedAssociationTask()
@ -495,7 +487,21 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
protected def startTransport(): Unit
protected def runInboundStreams(): Int
/**
* Bind to the ports for inbound streams. If '0' is specified, this will also select an
* arbitrary free local port. For UDP, we only select the port and leave the actual
* binding to Aeron when running the inbound stream.
*
* After calling this method the 'localAddress' and 'bindAddress' fields can be set.
*/
protected def bindInboundStreams(): (Int, Int)
/**
* Run the inbound streams that have been previously bound.
*
* Before calling this method the 'localAddress' and 'bindAddress' should have been set.
*/
protected def runInboundStreams(port: Int, bindPort: Int): Unit
private def startRemoveQuarantinedAssociationTask(): Unit = {
val removeAfter = settings.Advanced.RemoveQuarantinedAssociationAfter
@ -1013,22 +1019,6 @@ private[remote] object ArteryTransport {
final case class InboundStreamMatValues[LifeCycle](lifeCycle: LifeCycle, completed: Future[Done])
def autoSelectPort(hostname: String, udp: Boolean): Int = {
if (udp) {
val socket = DatagramChannel.open().socket()
socket.bind(new InetSocketAddress(hostname, 0))
val port = socket.getLocalPort
socket.close()
port
} else {
val socket = ServerSocketChannel.open().socket()
socket.bind(new InetSocketAddress(hostname, 0))
val port = socket.getLocalPort
socket.close()
port
}
}
val ControlStreamId = 1
val OrdinaryStreamId = 2
val LargeStreamId = 3

View file

@ -319,24 +319,21 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
settings.Advanced.Aeron.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels
else 50 * settings.Advanced.Aeron.IdleCpuLevel - 240
override protected def runInboundStreams(): Int = {
val bindPort =
(settings.Canonical.Port, settings.Bind.Port) match {
case (0, 0) =>
localAddress.address.port match {
case Some(n) if n != 0 => n
case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
}
case (0, n) => n
case (_, 0) => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
case (_, n) => n
}
if (settings.Bind.Port == 0)
localAddress.address.port match {
case Some(n) if n != 0 => n
case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
} else settings.Bind.Port
override protected def bindInboundStreams(): (Int, Int) = {
(settings.Canonical.Port, settings.Bind.Port) match {
case (0, 0) =>
val p = autoSelectPort(settings.Bind.Hostname)
(p, p)
case (0, _) =>
(settings.Bind.Port, settings.Bind.Port)
case (_, 0) =>
(settings.Canonical.Port, autoSelectPort(settings.Bind.Hostname))
case _ =>
(settings.Canonical.Port, settings.Bind.Port)
}
}
override protected def runInboundStreams(port: Int, bindPort: Int): Unit = {
val inboundChannel = s"aeron:udp?endpoint=${settings.Bind.Hostname}:$bindPort"
runInboundControlStream(inboundChannel)
@ -346,8 +343,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
runInboundLargeMessagesStream(inboundChannel)
}
blockUntilChannelActive()
bindPort
}
private def runInboundControlStream(inboundChannel: String): Unit = {
@ -466,4 +461,14 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
}(system.dispatchers.internalDispatcher)
}
def autoSelectPort(hostname: String): Int = {
import java.nio.channels.DatagramChannel
import java.net.InetSocketAddress
val socket = DatagramChannel.open().socket()
socket.bind(new InetSocketAddress(hostname, 0))
val port = socket.getLocalPort
socket.close()
port
}
}

View file

@ -78,8 +78,10 @@ private[remote] class ArteryTcpTransport(
// may change when inbound streams are restarted
@volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch")
// may change when inbound streams are restarted
@volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None
@volatile private var serverBinding: Option[ServerBinding] = None
private val firstConnectionFlow = Promise[Flow[ByteString, ByteString, NotUsed]]()
@volatile private var inboundConnectionFlow: Future[Flow[ByteString, ByteString, NotUsed]] =
firstConnectionFlow.future
private val sslEngineProvider: OptionVal[SSLEngineProvider] =
if (tlsEnabled) {
@ -201,7 +203,61 @@ private[remote] class ArteryTcpTransport(
.toMat(Sink.ignore)(Keep.right)
}
override protected def runInboundStreams(): Int = {
override protected def bindInboundStreams(): (Int, Int) = {
implicit val sys: ActorSystem = system
implicit val mat: Materializer = materializer
val bindHost = settings.Bind.Hostname
val bindPort = settings.Bind.Port
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
if (tlsEnabled) {
val sslProvider = sslEngineProvider.get
Tcp().bindTlsWithSSLEngine(
interface = bindHost,
port = bindPort,
createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort),
verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session)))
} else {
Tcp().bind(interface = bindHost, port = bindPort, halfClose = false)
}
val binding = serverBinding match {
case None =>
val afr = createFlightRecorderEventSink()
val binding = connectionSource
.to(Sink.foreach { connection =>
afr.loFreq(
TcpInbound_Connected,
s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}")
inboundConnectionFlow.map(connection.handleWith(_))(sys.dispatcher)
})
.run()
.recoverWith {
case e =>
Future.failed(
new RemoteTransportException(
s"Failed to bind TCP to [$bindHost:$bindPort] due to: " +
e.getMessage,
e))
}(ExecutionContexts.sameThreadExecutionContext)
// only on initial startup, when ActorSystem is starting
val b = Await.result(binding, settings.Bind.BindTimeout)
afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}")
b
case Some(binding) =>
// already bound, when restarting
binding
}
serverBinding = Some(binding)
if (settings.Canonical.Port == 0)
(binding.localAddress.getPort, binding.localAddress.getPort)
else
(settings.Canonical.Port, binding.localAddress.getPort)
}
override protected def runInboundStreams(port: Int, bindPort: Int): Unit = {
// Design note: The design of how to run the inbound streams are influenced by the original design
// for the Aeron streams, and there we can only have one single inbound since everything comes in
// via the single AeronSource.
@ -216,9 +272,6 @@ private[remote] class ArteryTcpTransport(
// have to be changed, such as compression advertisements and materialized values. Number of
// inbound streams would be dynamic, and so on.
implicit val mat: Materializer = materializer
implicit val sys: ActorSystem = system
// These streams are always running, only one instance of each inbound stream, and then the inbound connections
// are attached to these via a MergeHub.
val (controlStream, controlStreamCompleted) = runInboundControlStream()
@ -240,7 +293,7 @@ private[remote] class ArteryTcpTransport(
// decide where to attach it based on that byte. Then the streamId wouldn't have to be sent in each
// frame. That was not chosen because it is more complicated to implement and might have more runtime
// overhead.
inboundStream = OptionVal.Some(Sink.fromGraph(GraphDSL.create() { implicit b =>
val inboundStream = Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val partition = b.add(Partition[EnvelopeBuffer](3, env => {
env.streamId match {
@ -254,68 +307,22 @@ private[remote] class ArteryTcpTransport(
partition.out(1) ~> ordinaryMessagesStream
partition.out(2) ~> largeMessagesStream
SinkShape(partition.in)
}))
})
// If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down,
// but other parts of the inbound streams don't have to restarted.
def inboundConnectionFlow: Flow[ByteString, ByteString, NotUsed] = {
val newInboundConnectionFlow = {
// must create new Flow for each connection because of the FlightRecorder that can't be shared
val afr = createFlightRecorderEventSink()
Flow[ByteString]
.via(inboundKillSwitch.flow)
.via(new TcpFraming(afr))
.alsoTo(inboundStream.get)
.alsoTo(inboundStream)
.filter(_ => false) // don't send back anything in this TCP socket
.map(_ => ByteString.empty) // make it a Flow[ByteString] again
}
val bindHost = settings.Bind.Hostname
val bindPort =
if (settings.Bind.Port == 0 && settings.Canonical.Port == 0)
localAddress.address.port match {
case Some(n) => n
case _ => 0
} else settings.Bind.Port
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
if (tlsEnabled) {
val sslProvider = sslEngineProvider.get
Tcp().bindTlsWithSSLEngine(
interface = bindHost,
port = bindPort,
createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort),
verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session)))
} else {
Tcp().bind(interface = bindHost, port = bindPort, halfClose = false)
}
serverBinding = serverBinding match {
case None =>
val afr = createFlightRecorderEventSink()
val binding = connectionSource
.to(Sink.foreach { connection =>
afr.loFreq(
TcpInbound_Connected,
s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}")
connection.handleWith(inboundConnectionFlow)
})
.run()
.recoverWith {
case e =>
Future.failed(new RemoteTransportException(
s"Failed to bind TCP to [${localAddress.address.host.get}:${localAddress.address.port.get}] due to: " +
e.getMessage,
e))
}(ExecutionContexts.sameThreadExecutionContext)
// only on initial startup, when ActorSystem is starting
val b = Await.result(binding, settings.Bind.BindTimeout)
afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}")
Some(b)
case s @ Some(_) =>
// already bound, when restarting
s
}
firstConnectionFlow.trySuccess(newInboundConnectionFlow)
inboundConnectionFlow = Future.successful(newInboundConnectionFlow)
// Failures in any of the inbound streams should be extremely rare, probably an unforeseen accident.
// Tear down everything and start over again. Inbound streams are "stateless" so that should be fine.
@ -333,12 +340,10 @@ private[remote] class ArteryTcpTransport(
_ <- if (largeMessageChannelEnabled)
largeMessagesStreamCompleted.recover { case _ => Done } else Future.successful(Done)
} yield Done
allStopped.foreach(_ => runInboundStreams())
allStopped.foreach(_ => runInboundStreams(port, bindPort))
}
attachInboundStreamRestart("Inbound streams", completed, restart)
serverBinding.get.localAddress.getPort
}
private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = {