=rem #16623: Fix concurrent reliable delivery actors

(cherry picked from commit 4f7c90660226b05edc3581454d18fd53a4762802)

Conflicts:
	project/AkkaBuild.scala
This commit is contained in:
Patrik Nordwall 2015-01-19 10:03:40 +01:00
parent cedfaa35cc
commit a1d7199a42
9 changed files with 294 additions and 75 deletions

View file

@ -0,0 +1,90 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.actor.PoisonPill
object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
class Echo extends Actor {
def receive = {
case m sender ! m
}
}
}
class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliverySpec extends MultiNodeSpec(AttemptSysMsgRedeliveryMultiJvmSpec)
with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout {
import AttemptSysMsgRedeliveryMultiJvmSpec._
"AttemptSysMsgRedelivery" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
enterBarrier("after-1")
}
"redeliver system message after inactivity" taggedAs LongRunningTest in {
system.actorOf(Props[Echo], "echo")
enterBarrier("echo-started")
system.actorSelection(node(first) / "user" / "echo") ! Identify(None)
val firstRef: ActorRef = expectMsgType[ActorIdentity].ref.get
system.actorSelection(node(second) / "user" / "echo") ! Identify(None)
val secondRef: ActorRef = expectMsgType[ActorIdentity].ref.get
enterBarrier("refs-retrieved")
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
enterBarrier("blackhole")
runOn(first, third) {
watch(secondRef)
}
runOn(second) {
watch(firstRef)
}
enterBarrier("watch-established")
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("pass-through")
system.actorSelection("/user/echo") ! PoisonPill
runOn(first, third) {
expectTerminated(secondRef, 10.seconds)
}
runOn(second) {
expectTerminated(firstRef, 10.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,86 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.actor.PoisonPill
object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false))
testTransport(on = true)
class Echo extends Actor {
def receive = {
case m sender ! m
}
}
}
class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec
class AttemptSysMsgRedeliverySpec extends MultiNodeSpec(AttemptSysMsgRedeliveryMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import AttemptSysMsgRedeliveryMultiJvmSpec._
def initialParticipants = roles.size
"AttemptSysMsgRedelivery" must {
"redeliver system message after inactivity" taggedAs LongRunningTest in {
system.actorOf(Props[Echo], "echo")
enterBarrier("echo-started")
system.actorSelection(node(first) / "user" / "echo") ! Identify(None)
val firstRef: ActorRef = expectMsgType[ActorIdentity].ref.get
system.actorSelection(node(second) / "user" / "echo") ! Identify(None)
val secondRef: ActorRef = expectMsgType[ActorIdentity].ref.get
enterBarrier("refs-retrieved")
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
enterBarrier("blackhole")
runOn(first, third) {
watch(secondRef)
}
runOn(second) {
watch(firstRef)
}
enterBarrier("watch-established")
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("pass-through")
system.actorSelection("/user/echo") ! PoisonPill
runOn(first, third) {
expectTerminated(secondRef, 10.seconds)
}
runOn(second) {
expectTerminated(firstRef, 10.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -99,6 +99,8 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](
* @return An updated buffer containing the remaining unacknowledged messages
*/
def acknowledge(ack: Ack): AckedSendBuffer[T] = {
if (ack.cumulativeAck > maxSeq)
throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}")
val newNacked = (nacked ++ nonAcked) filter { m ack.nacks(m.seq) }
if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException
else this.copy(
@ -121,7 +123,7 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](
this.copy(nonAcked = this.nonAcked :+ msg, maxSeq = msg.seq)
}
override def toString = nonAcked.map(_.seq).mkString("[", ", ", "]")
override def toString = s"[$maxSeq ${nonAcked.map(_.seq).mkString("{", ", ", "}")}]"
}
/**

View file

@ -194,18 +194,15 @@ private[remote] class ReliableDeliverySupervisor(
import ReliableDeliverySupervisor._
import context.dispatcher
var autoResendTimer: Option[Cancellable] = None
val autoResendTimer = context.system.scheduler.schedule(
settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
def scheduleAutoResend(): Unit = if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
if (autoResendTimer.isEmpty)
autoResendTimer = Some(context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery))
}
def rescheduleAutoResend(): Unit = {
autoResendTimer.foreach(_.cancel())
autoResendTimer = None
scheduleAutoResend()
}
// Aim for a maximum of 100 resend/s = 0.1 resend/ms
val maxResendRate = 100.0
val resendLimit =
math.min(
1000,
math.max((settings.SysResendTimeout.toMillis * (maxResendRate / 1000.0)).toInt, 1))
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e @ (_: AssociationProblem) Escalate
@ -214,6 +211,8 @@ private[remote] class ReliableDeliverySupervisor(
log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy)
uidConfirmed = false // Need confirmation of UID again
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
context.become(gated)
currentHandle = None
context.parent ! StoppedReading(self)
@ -223,16 +222,12 @@ private[remote] class ReliableDeliverySupervisor(
var currentHandle: Option[AkkaProtocolHandle] = handleOrActive
var resendBuffer: AckedSendBuffer[Send] = _
var lastCumulativeAck: SeqNo = _
var seqCounter: Long = _
var pendingAcks = Vector.empty[Ack]
def reset() {
resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
scheduleAutoResend()
lastCumulativeAck = SeqNo(-1)
seqCounter = 0L
pendingAcks = Vector.empty
bailoutAt = None
}
reset()
@ -245,7 +240,7 @@ private[remote] class ReliableDeliverySupervisor(
var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
val bailoutAt: Deadline = Deadline.now + settings.InitialSysMsgDeliveryTimeout
var bailoutAt: Option[Deadline] = None
// Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
// UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
// any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
@ -254,11 +249,6 @@ private[remote] class ReliableDeliverySupervisor(
// (This actor is never restarted)
var uidConfirmed: Boolean = uid.isDefined
def unstashAcks(): Unit = {
pendingAcks foreach (self ! _)
pendingAcks = Vector.empty
}
override def postStop(): Unit = {
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
// number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are
@ -268,6 +258,7 @@ private[remote] class ReliableDeliverySupervisor(
// the remote system later.
(resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s context.system.deadLetters ! s.copy(seqOpt = None) }
receiveBuffers.remove(Link(localAddress, remoteAddress))
autoResendTimer.cancel()
}
override def postRestart(reason: Throwable): Unit = {
@ -284,22 +275,16 @@ private[remote] class ReliableDeliverySupervisor(
case s: Send
handleSend(s)
case ack: Ack
if (!uidConfirmed) pendingAcks = pendingAcks :+ ack
else {
// If we are not sure about the UID just ignore the ack. Ignoring is fine.
if (uidConfirmed) {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e)
throw new InvalidAssociationException(s"Error encountered while processing system message acknowledgement $resendBuffer $ack", e)
throw new HopelessAssociation(localAddress, remoteAddress, uid,
new IllegalStateException(s"Error encountered while processing system message " +
s"acknowledgement buffer: $resendBuffer ack: $ack", e))
}
if (lastCumulativeAck < ack.cumulativeAck) {
lastCumulativeAck = ack.cumulativeAck
// Cumulative ack is progressing, we might not need to resend non-acked messages yet.
// If this progression stops, the timer will eventually kick in, since scheduleAutoResend
// does not cancel existing timers (see the "else" case).
rescheduleAutoResend()
} else scheduleAutoResend()
resendNacked()
}
case AttemptSysMsgRedelivery
@ -311,11 +296,11 @@ private[remote] class ReliableDeliverySupervisor(
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
case g @ GotUid(receivedUid, _)
bailoutAt = None
context.parent ! g
// New system that has the same address as the old - need to start from fresh state
uidConfirmed = true
if (uid.exists(_ != receivedUid)) reset()
else unstashAcks()
uid = Some(receivedUid)
resendAll()
@ -333,13 +318,14 @@ private[remote] class ReliableDeliverySupervisor(
// remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable
// again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined.
// In other words, this action is safe.
if (!uidConfirmed && bailoutAt.isOverdue())
throw new InvalidAssociation(localAddress, remoteAddress,
if (bailoutAt.exists(_.isOverdue()))
throw new HopelessAssociation(localAddress, remoteAddress, uid,
new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped."))
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
} else context.become(idle)
case AttemptSysMsgRedelivery // Ignore
case s @ Send(msg: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq())))
case s: Send context.system.deadLetters ! s
case EndpointWriter.FlushAndStop context.stop(self)
@ -355,9 +341,11 @@ private[remote] class ReliableDeliverySupervisor(
handleSend(s)
context.become(receive)
case AttemptSysMsgRedelivery
if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
}
case EndpointWriter.FlushAndStop context.stop(self)
case EndpointWriter.StopReading(w, replyTo)
replyTo ! EndpointWriter.StoppedReading(w)
@ -378,15 +366,15 @@ private[remote] class ReliableDeliverySupervisor(
tryBuffer(sequencedSend)
// If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
// GotUid will kick resendAll() causing the messages to be properly written
if (uidConfirmed) writer ! sequencedSend
if (uidConfirmed)
writer ! sequencedSend
} else writer ! send
private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ }
private def resendAll(): Unit = {
resendNacked()
resendBuffer.nonAcked foreach { writer ! _ }
rescheduleAutoResend()
resendBuffer.nonAcked.take(resendLimit) foreach { writer ! _ }
}
private def tryBuffer(s: Send): Unit =
@ -810,6 +798,8 @@ private[remote] class EndpointWriter(
context.stop(self)
case OutboundAck(ack)
lastAck = Some(ack)
if (ackDeadline.isOverdue())
trySendPureAck()
case AckIdleCheckTimer // Ignore
case FlushAndStopTimeout // ignore
case BackoffTimer // ignore

View file

@ -459,7 +459,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
Stop
case HopelessAssociation(localAddress, remoteAddress, Some(uid), _)
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason)
log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
remoteAddress, uid)
settings.QuarantineDuration match {
case d: FiniteDuration
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
@ -673,6 +675,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
else {
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
endpoints.removePolicy(handle.remoteAddress)
}
}

View file

@ -471,7 +471,7 @@ private[transport] class ThrottledAssociation(
case Event(Disassociated(info), _)
stop() // not notifying the upstream handler is intentional: we are relying on heartbeating
case Event(FailWith(reason), _)
upstreamListener notify Disassociated(reason)
if (upstreamListener ne null) upstreamListener notify Disassociated(reason)
stop()
}

View file

@ -130,7 +130,7 @@ class AckedDeliverySpec extends AkkaSpec {
val b8 = b7.acknowledge(Ack(SeqNo(2)))
b8.nonAcked should be(Vector(msg3, msg4))
val b9 = b8.acknowledge(Ack(SeqNo(5)))
val b9 = b8.acknowledge(Ack(SeqNo(4)))
b9.nonAcked should be(Vector.empty)
}
@ -164,7 +164,7 @@ class AckedDeliverySpec extends AkkaSpec {
b6.nonAcked should be(Vector())
b6.nacked should be(Vector(msg2, msg3))
val b7 = b6.acknowledge(Ack(SeqNo(5)))
val b7 = b6.acknowledge(Ack(SeqNo(4)))
b7.nonAcked should be(Vector.empty)
b7.nacked should be(Vector.empty)
}

View file

@ -3,6 +3,7 @@
*/
package akka.remote.transport
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.testkit.TimingTest
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
@ -12,8 +13,7 @@ import AkkaProtocolStressTest._
import akka.actor._
import scala.concurrent.duration._
import akka.testkit._
import akka.remote.EndpointException
import akka.remote.{ RARP, EndpointException }
import akka.remote.{ QuarantinedEvent, EndpointException, RARP }
import akka.remote.transport.FailureInjectorTransportAdapter.{ One, All, Drop }
import scala.concurrent.Await
import akka.actor.ActorRef
@ -32,7 +32,11 @@ import akka.dispatch.sysmsg.{ Failed, SystemMessage }
import akka.pattern.pipe
object SystemMessageDeliveryStressTest {
val baseConfig: Config = ConfigFactory parseString ("""
val msgCount = 5000
val burstSize = 100
val burstDelay = 500.millis
val baseConfig: Config = ConfigFactory parseString (s"""
akka {
#loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
@ -41,18 +45,18 @@ object SystemMessageDeliveryStressTest {
remote.log-remote-lifecycle-events = on
remote.transport-failure-detector {
threshold = 1.0
max-sample-size = 2
min-std-deviation = 1 ms
heartbeat-interval = 500 ms
acceptable-heartbeat-pause = 2 s
heartbeat-interval = 1 s
acceptable-heartbeat-pause = 5 s
}
remote.system-message-buffer-size = $msgCount
## Keep this setting tight, otherwise the test takes a long time or times out
remote.resend-interval = 0.5 s
remote.resend-interval = 2 s
remote.system-message-ack-piggyback-timeout = 100 ms // Force heavy Ack traffic
remote.initial-system-message-delivery-timeout = 10 m
remote.use-passive-connections = on
remote.netty.tcp {
applied-adapters = ["gremlin"]
applied-adapters = ["gremlin", "trttl"]
port = 0
}
@ -75,8 +79,11 @@ object SystemMessageDeliveryStressTest {
}
}
class SystemMessageSender(val msgCount: Int, val target: ActorRef) extends Actor {
class SystemMessageSender(val msgCount: Int, val burstSize: Int, val burstDelay: FiniteDuration, val target: ActorRef) extends Actor {
import context.dispatcher
var counter = 0
var burstCounter = 0
val targetRef = target.asInstanceOf[InternalActorRef]
override def preStart(): Unit = self ! "sendnext"
@ -85,7 +92,15 @@ object SystemMessageDeliveryStressTest {
case "sendnext"
targetRef.sendSystemMessage(Failed(null, null, counter))
counter += 1
if (counter < msgCount) self ! "sendnext"
burstCounter += 1
if (counter < msgCount) {
if (burstCounter < burstSize) self ! "sendnext"
else {
burstCounter = 0
context.system.scheduler.scheduleOnce(burstDelay, self, "sendnext")
}
}
}
}
@ -97,36 +112,69 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
with DefaultTimeout {
import SystemMessageDeliveryStressTest._
val systemB = ActorSystem("systemB", system.settings.config)
val sysMsgVerifier = new SystemMessageSequenceVerifier(system, testActor)
val MsgCount = 100
override def expectedTestDuration: FiniteDuration = 120.seconds
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val systemA = system
val systemB = ActorSystem("systemB", system.settings.config)
val probeA = TestProbe()(systemA)
val probeB = TestProbe()(systemB)
val sysMsgVerifierA = new SystemMessageSequenceVerifier(systemA, probeA.ref)
val sysMsgVerifierB = new SystemMessageSequenceVerifier(systemB, probeB.ref)
val addressA = systemA.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val root = RootActorPath(address)
// We test internals here (system message delivery) so we are allowed to cheat
val there = RARP(systemB).provider.resolveActorRef(root / "temp" / sysMsgVerifier.path.name).asInstanceOf[InternalActorRef]
val targetForA = RARP(systemA).provider.resolveActorRef(RootActorPath(addressB) / "temp" / sysMsgVerifierB.path.name)
val targetForB = RARP(systemB).provider.resolveActorRef(RootActorPath(addressA) / "temp" / sysMsgVerifierA.path.name)
override def atStartup() = {
system.eventStream.publish(TestEvent.Mute(
systemA.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
EventFilter.warning(pattern = "received dead .*")))
systemB.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*")))
EventFilter.warning(pattern = "received dead .*")))
systemA.eventStream.subscribe(probeA.ref, classOf[QuarantinedEvent])
systemB.eventStream.subscribe(probeB.ref, classOf[QuarantinedEvent])
}
"Remoting " + msg must {
"guaranteed delivery and message ordering despite packet loss " taggedAs TimingTest in {
Await.result(RARP(systemB).provider.transport.managementCommand(One(address, Drop(0.3, 0.3))), 3.seconds.dilated)
systemB.actorOf(Props(classOf[SystemMessageSender], MsgCount, there))
import systemA.dispatcher
val toSend = (0 until MsgCount).toList
val received = expectMsgAllOf(45.seconds, toSend: _*)
val transportA = RARP(systemA).provider.transport
val transportB = RARP(systemB).provider.transport
received should be(toSend)
Await.result(transportA.managementCommand(One(addressB, Drop(0.1, 0.1))), 3.seconds.dilated)
Await.result(transportB.managementCommand(One(addressA, Drop(0.1, 0.1))), 3.seconds.dilated)
// Schedule peridodic disassociates
systemA.scheduler.schedule(3.second, 8.seconds) {
transportA.managementCommand(ForceDisassociateExplicitly(addressB, reason = AssociationHandle.Unknown))
}
systemB.scheduler.schedule(7.seconds, 8.seconds) {
transportB.managementCommand(ForceDisassociateExplicitly(addressA, reason = AssociationHandle.Unknown))
}
systemB.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForB))
systemA.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForA))
val toSend = (0 until msgCount).toList
var maxDelay = 0L
for (m 0 until msgCount) {
val start = System.currentTimeMillis()
probeB.expectMsg(10.minutes, m)
probeA.expectMsg(10.minutes, m)
maxDelay = math.max(maxDelay, (System.currentTimeMillis() - start) / 1000)
}
}
}

View file

@ -347,7 +347,7 @@ object AkkaBuild extends Build {
lazy val mimaIgnoredProblems = {
Seq(
// add filters here, see release-2.2 branch
// add filters here, see release-2.3 branch
)
}