handle longer network partitions, #21399

* system messages in flight should not trigger premature quarantine
  in case of longer network partitions, therefore we keep the control
  stream alive
* add give-up-system-message-after property that is used by both
  SystemMessageDelivery and AeronSink in the control stream
* also unwrap SystemMessageEnvelope in RemoteDeadLetterActorRef
* skip sending control messages after shutdown, can be triggered
  by scheduled compression advertisment
This commit is contained in:
Patrik Nordwall 2016-09-09 07:45:21 +02:00
parent cd4a31e74d
commit 1584c52190
12 changed files with 181 additions and 31 deletions

View file

@ -82,7 +82,7 @@ abstract class AeronStreamConsistencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
override def afterAll(): Unit = {
taskRunner.stop()
@ -98,7 +98,7 @@ abstract class AeronStreamConsistencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
}
enterBarrier("echo-started")
}
@ -139,7 +139,7 @@ abstract class AeronStreamConsistencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
started.expectMsg(Done)
}
@ -151,7 +151,7 @@ abstract class AeronStreamConsistencySpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
Await.ready(done, 20.seconds)
killSwitch.shutdown()

View file

@ -115,7 +115,7 @@ abstract class AeronStreamLatencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -245,7 +245,7 @@ abstract class AeronStreamLatencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
started.expectMsg(Done)
}
@ -264,7 +264,7 @@ abstract class AeronStreamLatencySpec
val queueValue = Source.fromGraph(new SendQueue[Unit])
.via(sendFlow)
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
.run()
val queue = new ManyToOneConcurrentArrayQueue[Unit](1024)
@ -314,7 +314,7 @@ abstract class AeronStreamLatencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
}
enterBarrier("echo-started")
}

View file

@ -115,7 +115,7 @@ abstract class AeronStreamMaxThroughputSpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -213,7 +213,7 @@ abstract class AeronStreamMaxThroughputSpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
printStats("sender")
enterBarrier(testName + "-done")

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor._
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.remote.RARP
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.remote.QuarantinedEvent
object SurviveNetworkPartitionSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.artery.enabled = on
akka.remote.artery.advanced.give-up-system-message-after = 4s
""")))
testTransport(on = true)
}
class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec
class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec
abstract class SurviveNetworkPartitionSpec
extends MultiNodeSpec(SurviveNetworkPartitionSpec)
with STMultiNodeSpec with ImplicitSender {
import SurviveNetworkPartitionSpec._
override def initialParticipants = roles.size
"Network partition" must {
"not quarantine system when it heals within 'give-up-system-message-after'" taggedAs LongRunningTest in {
runOn(second) {
system.actorOf(TestActors.echoActorProps, "echo1")
}
enterBarrier("echo-started")
runOn(first) {
system.actorSelection(node(second) / "user" / "echo1") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! "ping1"
expectMsg("ping1")
// network partition
testConductor.blackhole(first, second, Direction.Both).await
// send system message during network partition
watch(ref)
// keep the network partition for a while, but shorter than give-up-system-message-after
expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 2.second)
// heal the network partition
testConductor.passThrough(first, second, Direction.Both).await
// not quarantined
ref ! "ping2"
expectMsg("ping2")
ref ! PoisonPill
expectTerminated(ref)
}
enterBarrier("done")
}
"quarantine system when it doesn't heal within 'give-up-system-message-after'" taggedAs LongRunningTest in {
runOn(second) {
system.actorOf(TestActors.echoActorProps, "echo2")
}
enterBarrier("echo-started")
runOn(first) {
val qProbe = TestProbe()
system.eventStream.subscribe(qProbe.ref, classOf[QuarantinedEvent])
system.actorSelection(node(second) / "user" / "echo2") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! "ping1"
expectMsg("ping1")
// network partition
testConductor.blackhole(first, second, Direction.Both).await
// send system message during network partition
watch(ref)
// keep the network partition for a while, longer than give-up-system-message-after
expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 1.second)
qProbe.expectMsgType[QuarantinedEvent](5.seconds).address should ===(node(second).address)
expectTerminated(ref)
}
enterBarrier("done")
}
}
}

View file

@ -246,7 +246,12 @@ akka {
inject-handshake-interval = 1 second
# messages that are not accepted by Aeron are dropped after retrying for this period
give-up-send-after = 60 seconds
give-up-message-after = 60 seconds
# System messages that are not acknowledged after re-sending for this period are
# dropped and will trigger quarantine. The value should be longer than the length
# of a network partition that you need to survive.
give-up-system-message-after = 6 hours
# during ActorSystem termination the remoting will wait this long for
# an acknowledgment by the destination system that flushing of outstanding

View file

@ -20,6 +20,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.artery.ArteryTransport
import akka.util.OptionVal
import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
/**
* INTERNAL API
@ -98,14 +99,19 @@ private[akka] object RemoteActorRefProvider {
// the dead letter status
if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient))
case env: OutboundEnvelope
super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters),
super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters),
env.recipient.getOrElse(_provider.deadLetters)))
case DeadLetter(env: OutboundEnvelope, _, _)
super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters),
super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters),
env.recipient.getOrElse(_provider.deadLetters)))
case _ super.!(message)(sender)
}
private def unwrapSystemMessageEnvelope(msg: AnyRef): AnyRef = msg match {
case SystemMessageEnvelope(m, _, _) m
case _ msg
}
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}

View file

@ -3,6 +3,7 @@
*/
package akka.remote.artery
import akka.util.PrettyDuration.PrettyPrintableDuration
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
@ -28,7 +29,7 @@ import org.agrona.hints.ThreadHints
object AeronSink {
final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace
final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace
@ -80,7 +81,7 @@ class AeronSink(
aeron: Aeron,
taskRunner: TaskRunner,
pool: EnvelopeBufferPool,
giveUpSendAfter: Duration,
giveUpAfter: Duration,
flightRecorder: EventSink)
extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] {
import AeronSink._
@ -104,7 +105,7 @@ class AeronSink(
private var backoffCount = spinning
private var lastMsgSize = 0
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ taskOnOfferSuccess()),
giveUpSendAfter, getAsyncCallback(_ onGiveUp()), getAsyncCallback(_ onPublicationClosed()))
giveUpAfter, getAsyncCallback(_ onGiveUp()), getAsyncCallback(_ onPublicationClosed()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
@ -191,7 +192,7 @@ class AeronSink(
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.")
val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII"))
completedValue = Failure(cause)
failStage(cause)

View file

@ -89,8 +89,10 @@ private[akka] final class ArterySettings private (config: Config) {
interval > Duration.Zero, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > Duration.Zero, "inject-handshake-interval must be more than zero")
val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval
interval > Duration.Zero, "give-up-send-after must be more than zero")
val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval
interval > Duration.Zero, "give-up-message-after must be more than zero")
val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval
interval > Duration.Zero, "give-up-system-message-after must be more than zero")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval

View file

@ -876,8 +876,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
aeronSink(outboundContext, ordinaryStreamId)
private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = {
val giveUpAfter =
if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter
else settings.Advanced.GiveUpMessageAfter
Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))
envelopeBufferPool, giveUpAfter, createFlightRecorderEventSink()))
}
def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] =

View file

@ -24,7 +24,7 @@ import akka.event.Logging
import akka.remote._
import akka.remote.DaemonMsgCreate
import akka.remote.QuarantinedEvent
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.remote.artery.Encoder.ChangeOutboundCompression
import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
@ -589,7 +589,7 @@ private[remote] class Association(
// don't restart after shutdown, but log some details so we notice
log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
case _: AbruptTerminationException // ActorSystem shutdown
case cause: GaveUpSendingException
case cause: GaveUpMessageException
log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
// restart unconditionally, without counting restarts
lazyRestart()

View file

@ -3,6 +3,7 @@
*/
package akka.remote.artery
import akka.util.PrettyDuration.PrettyPrintableDuration
import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.duration._
@ -26,6 +27,7 @@ import akka.actor.ActorRef
import akka.remote.PriorityMessage
import akka.actor.ActorSelectionMessage
import akka.dispatch.sysmsg.SystemMessage
import scala.util.control.NoStackTrace
/**
* INTERNAL API
@ -38,11 +40,14 @@ private[akka] object SystemMessageDelivery {
final case object ClearSystemMessageDelivery
final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
private case object ResendTick
// If other message types than SystemMesage need acked delivery they can extend this trait.
// Used in tests since real SystemMessage are somewhat cumbersome to create.
trait AckedDeliveryMessage
}
/**
@ -71,6 +76,9 @@ private[akka] class SystemMessageDelivery(
private var resendingFromSeqNo = -1L
private var stopping = false
private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos
private var ackTimestamp = System.nanoTime()
private def localAddress = outboundContext.localAddress
private def remoteAddress = outboundContext.remoteAddress
@ -109,13 +117,13 @@ private[akka] class SystemMessageDelivery(
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case ResendTick
checkGiveUp()
if (resending.isEmpty && !unacknowledged.isEmpty) {
resending = unacknowledged.clone()
tryResend()
}
if (!unacknowledged.isEmpty)
scheduleOnce(ResendTick, resendInterval)
// FIXME give up resending after a long while, i.e. config property quarantine-after-silence
}
// ControlMessageObserver, external call
@ -141,6 +149,7 @@ private[akka] class SystemMessageDelivery(
}
private def ack(n: Long): Unit = {
ackTimestamp = System.nanoTime()
if (n <= seqNo)
clearUnacknowledged(n)
}
@ -176,6 +185,10 @@ private[akka] class SystemMessageDelivery(
case msg @ (_: SystemMessage | _: AckedDeliveryMessage)
if (unacknowledged.size < maxBufferSize) {
seqNo += 1
if (unacknowledged.isEmpty)
ackTimestamp = System.nanoTime()
else
checkGiveUp()
val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress))
unacknowledged.offer(sendEnvelope)
scheduleOnce(ResendTick, resendInterval)
@ -209,6 +222,13 @@ private[akka] class SystemMessageDelivery(
}
}
private def checkGiveUp(): Unit = {
if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos))
throw new GaveUpSystemMessageException(
s"Gave up sending system message to [${outboundContext.remoteAddress}] after " +
s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.")
}
private def clear(): Unit = {
sendUnacknowledgedToDeadLetters()
seqNo = 0L // sequence number for the first message will be 1

View file

@ -10,7 +10,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.ExtendedActorSystem
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Sink
@ -75,7 +75,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender {
.runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink))
// without the give up timeout the stream would not complete/fail
intercept[GaveUpSendingException] {
intercept[GaveUpMessageException] {
Await.result(done, 5.seconds)
}
}