Fix race conditions in Artery TCP binding to 0 (#27456)

This commit is contained in:
Arnout Engelen 2019-08-06 15:21:28 +02:00 committed by Helena Edelson
parent dd7dad1b7b
commit 38cda5147f
4 changed files with 70 additions and 38 deletions

View file

@ -32,3 +32,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.Inbound
# #22333 Disable Java serialization
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.ThrowableSupport.stackTraceElementBuilder")
# #27455 Bind to arbitrary port in Artery TCP
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.*.runInboundStreams")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.*.runInboundStreams")

View file

@ -454,20 +454,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp)
} else settings.Canonical.Port
val bindPort = if (settings.Bind.Port == 0) {
if (settings.Canonical.Port == 0) port // canonical and bind ports are zero. Use random port for both
else ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp)
} else settings.Bind.Port
_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)
_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, bindPort),
AddressUidExtension(system).longAddressUid)
// TODO: This probably needs to be a global value instead of an event as events might rotate out of the log
topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString())
@ -478,7 +469,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
messageDispatcher = new MessageDispatcher(system, provider)
topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData)
runInboundStreams()
val boundPort = runInboundStreams()
_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)
topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData)
startRemoveQuarantinedAssociationTask()
@ -496,12 +491,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
bindAddress.address,
localAddress.uid)
}
}
protected def startTransport(): Unit
protected def runInboundStreams(): Unit
protected def runInboundStreams(): Int
private def startRemoveQuarantinedAssociationTask(): Unit = {
val removeAfter = settings.Advanced.RemoveQuarantinedAssociationAfter

View file

@ -68,7 +68,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
private val taskRunner = new TaskRunner(system, settings.Advanced.Aeron.IdleCpuLevel)
private def inboundChannel = s"aeron:udp?endpoint=${bindAddress.address.host.get}:${bindAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
override protected def startTransport(): Unit = {
@ -301,7 +300,10 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
createFlightRecorderEventSink()))
}
private def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] =
private def aeronSource(
streamId: Int,
pool: EnvelopeBufferPool,
inboundChannel: String): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] =
Source.fromGraph(
new AeronSource(
inboundChannel,
@ -317,21 +319,42 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
settings.Advanced.Aeron.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels
else 50 * settings.Advanced.Aeron.IdleCpuLevel - 240
override protected def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()
override protected def runInboundStreams(): Int = {
val bindPort =
(settings.Canonical.Port, settings.Bind.Port) match {
case (0, 0) =>
localAddress.address.port match {
case Some(n) if n != 0 => n
case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
}
case (0, n) => n
case (_, 0) => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
case (_, n) => n
}
if (settings.Bind.Port == 0)
localAddress.address.port match {
case Some(n) if n != 0 => n
case _ => ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp = true)
} else settings.Bind.Port
val inboundChannel = s"aeron:udp?endpoint=${settings.Bind.Hostname}:$bindPort"
runInboundControlStream(inboundChannel)
runInboundOrdinaryMessagesStream(inboundChannel)
if (largeMessageChannelEnabled) {
runInboundLargeMessagesStream()
runInboundLargeMessagesStream(inboundChannel)
}
blockUntilChannelActive()
bindPort
}
private def runInboundControlStream(): Unit = {
private def runInboundControlStream(inboundChannel: String): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, ctrl, completed) =
aeronSource(ControlStreamId, envelopeBufferPool)
aeronSource(ControlStreamId, envelopeBufferPool, inboundChannel)
.via(inboundFlow(settings, NoInboundCompressions))
.toMat(inboundControlSink)({ case (a, (c, d)) => (a, c, d) })
.run()(controlMaterializer)
@ -339,15 +362,15 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
attachControlMessageObserver(ctrl)
updateStreamMatValues(ControlStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound control stream", completed, () => runInboundControlStream())
attachInboundStreamRestart("Inbound control stream", completed, () => runInboundControlStream(inboundChannel))
}
private def runInboundOrdinaryMessagesStream(): Unit = {
private def runInboundOrdinaryMessagesStream(inboundChannel: String): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, inboundCompressionAccess, completed) =
if (inboundLanes == 1) {
aeronSource(OrdinaryStreamId, envelopeBufferPool)
aeronSource(OrdinaryStreamId, envelopeBufferPool, inboundChannel)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) => (a, b, c) })
.run()(materializer)
@ -355,7 +378,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
} else {
val laneKillSwitch = KillSwitches.shared("laneKillSwitch")
val laneSource: Source[InboundEnvelope, (AeronLifecycle, InboundCompressionAccess)] =
aeronSource(OrdinaryStreamId, envelopeBufferPool)
aeronSource(OrdinaryStreamId, envelopeBufferPool, inboundChannel)
.via(laneKillSwitch.flow)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool)))
@ -395,19 +418,25 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
setInboundCompressionAccess(inboundCompressionAccess)
updateStreamMatValues(OrdinaryStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound message stream", completed, () => runInboundOrdinaryMessagesStream())
attachInboundStreamRestart(
"Inbound message stream",
completed,
() => runInboundOrdinaryMessagesStream(inboundChannel))
}
private def runInboundLargeMessagesStream(): Unit = {
private def runInboundLargeMessagesStream(inboundChannel: String): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool)
val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool, inboundChannel)
.via(inboundLargeFlow(settings))
.toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both)
.run()(materializer)
updateStreamMatValues(LargeStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound large message stream", completed, () => runInboundLargeMessagesStream())
attachInboundStreamRestart(
"Inbound large message stream",
completed,
() => runInboundLargeMessagesStream(inboundChannel))
}
private def updateStreamMatValues(

View file

@ -79,7 +79,7 @@ private[remote] class ArteryTcpTransport(
@volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch")
// may change when inbound streams are restarted
@volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None
@volatile private var serverBinding: Option[Future[ServerBinding]] = None
@volatile private var serverBinding: Option[ServerBinding] = None
private val sslEngineProvider: OptionVal[SSLEngineProvider] =
if (tlsEnabled) {
@ -201,8 +201,7 @@ private[remote] class ArteryTcpTransport(
.toMat(Sink.ignore)(Keep.right)
}
override protected def runInboundStreams(): Unit = {
override protected def runInboundStreams(): Int = {
// Design note: The design of how to run the inbound streams are influenced by the original design
// for the Aeron streams, and there we can only have one single inbound since everything comes in
// via the single AeronSource.
@ -270,8 +269,13 @@ private[remote] class ArteryTcpTransport(
.map(_ => ByteString.empty) // make it a Flow[ByteString] again
}
val bindHost = bindAddress.address.host.get
val bindPort = bindAddress.address.port.get
val bindHost = settings.Bind.Hostname
val bindPort =
if (settings.Bind.Port == 0 && settings.Canonical.Port == 0)
localAddress.address.port match {
case Some(n) => n
case _ => 0
} else settings.Bind.Port
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
if (tlsEnabled) {
@ -305,9 +309,9 @@ private[remote] class ArteryTcpTransport(
}(ExecutionContexts.sameThreadExecutionContext)
// only on initial startup, when ActorSystem is starting
Await.result(binding, settings.Bind.BindTimeout)
afr.loFreq(TcpInbound_Bound, s"$bindHost:$bindPort")
Some(binding)
val b = Await.result(binding, settings.Bind.BindTimeout)
afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}")
Some(b)
case s @ Some(_) =>
// already bound, when restarting
s
@ -333,6 +337,8 @@ private[remote] class ArteryTcpTransport(
}
attachInboundStreamRestart("Inbound streams", completed, restart)
serverBinding.get.localAddress.getPort
}
private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = {
@ -454,8 +460,7 @@ private[remote] class ArteryTcpTransport(
case Some(binding) =>
implicit val ec = system.dispatchers.internalDispatcher
for {
b <- binding
_ <- b.unbind()
_ <- binding.unbind()
} yield {
topLevelFlightRecorder.loFreq(
TcpInbound_Bound,