diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/AttemptSysMsgRedeliverySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/AttemptSysMsgRedeliverySpec.scala new file mode 100644 index 0000000000..d1f509ecd0 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/AttemptSysMsgRedeliverySpec.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } +import akka.actor.PoisonPill + +object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig { + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) + + class Echo extends Actor { + def receive = { + case m ⇒ sender ! m + } + } +} + +class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec +class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec +class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec + +class AttemptSysMsgRedeliverySpec extends MultiNodeSpec(AttemptSysMsgRedeliveryMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout { + import AttemptSysMsgRedeliveryMultiJvmSpec._ + + "AttemptSysMsgRedelivery" must { + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third) + + enterBarrier("after-1") + } + + "redeliver system message after inactivity" taggedAs LongRunningTest in { + system.actorOf(Props[Echo], "echo") + enterBarrier("echo-started") + + system.actorSelection(node(first) / "user" / "echo") ! Identify(None) + val firstRef: ActorRef = expectMsgType[ActorIdentity].ref.get + system.actorSelection(node(second) / "user" / "echo") ! Identify(None) + val secondRef: ActorRef = expectMsgType[ActorIdentity].ref.get + enterBarrier("refs-retrieved") + + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + } + enterBarrier("blackhole") + + runOn(first, third) { + watch(secondRef) + } + runOn(second) { + watch(firstRef) + } + enterBarrier("watch-established") + + runOn(first) { + testConductor.passThrough(first, second, Direction.Both).await + } + enterBarrier("pass-through") + + system.actorSelection("/user/echo") ! PoisonPill + + runOn(first, third) { + expectTerminated(secondRef, 10.seconds) + } + runOn(second) { + expectTerminated(firstRef, 10.seconds) + } + + enterBarrier("done") + } + } + +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala new file mode 100644 index 0000000000..cd8df7aaa4 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } +import akka.actor.PoisonPill + +object AttemptSysMsgRedeliveryMultiJvmSpec extends MultiNodeConfig { + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false)) + + testTransport(on = true) + + class Echo extends Actor { + def receive = { + case m ⇒ sender ! m + } + } +} + +class AttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec +class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec +class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec + +class AttemptSysMsgRedeliverySpec extends MultiNodeSpec(AttemptSysMsgRedeliveryMultiJvmSpec) + with STMultiNodeSpec with ImplicitSender with DefaultTimeout { + import AttemptSysMsgRedeliveryMultiJvmSpec._ + + def initialParticipants = roles.size + + "AttemptSysMsgRedelivery" must { + "redeliver system message after inactivity" taggedAs LongRunningTest in { + system.actorOf(Props[Echo], "echo") + enterBarrier("echo-started") + + system.actorSelection(node(first) / "user" / "echo") ! Identify(None) + val firstRef: ActorRef = expectMsgType[ActorIdentity].ref.get + system.actorSelection(node(second) / "user" / "echo") ! Identify(None) + val secondRef: ActorRef = expectMsgType[ActorIdentity].ref.get + enterBarrier("refs-retrieved") + + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + } + enterBarrier("blackhole") + + runOn(first, third) { + watch(secondRef) + } + runOn(second) { + watch(firstRef) + } + enterBarrier("watch-established") + + runOn(first) { + testConductor.passThrough(first, second, Direction.Both).await + } + enterBarrier("pass-through") + + system.actorSelection("/user/echo") ! PoisonPill + + runOn(first, third) { + expectTerminated(secondRef, 10.seconds) + } + runOn(second) { + expectTerminated(firstRef, 10.seconds) + } + + enterBarrier("done") + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala index c501553b2a..6aa8ac147b 100644 --- a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala @@ -99,6 +99,8 @@ final case class AckedSendBuffer[T <: HasSequenceNumber]( * @return An updated buffer containing the remaining unacknowledged messages */ def acknowledge(ack: Ack): AckedSendBuffer[T] = { + if (ack.cumulativeAck > maxSeq) + throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}") val newNacked = (nacked ++ nonAcked) filter { m ⇒ ack.nacks(m.seq) } if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException else this.copy( @@ -121,7 +123,7 @@ final case class AckedSendBuffer[T <: HasSequenceNumber]( this.copy(nonAcked = this.nonAcked :+ msg, maxSeq = msg.seq) } - override def toString = nonAcked.map(_.seq).mkString("[", ", ", "]") + override def toString = s"[$maxSeq ${nonAcked.map(_.seq).mkString("{", ", ", "}")}]" } /** diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 637d38ed2e..b0faae052f 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -194,18 +194,15 @@ private[remote] class ReliableDeliverySupervisor( import ReliableDeliverySupervisor._ import context.dispatcher - var autoResendTimer: Option[Cancellable] = None + val autoResendTimer = context.system.scheduler.schedule( + settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) - def scheduleAutoResend(): Unit = if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) { - if (autoResendTimer.isEmpty) - autoResendTimer = Some(context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)) - } - - def rescheduleAutoResend(): Unit = { - autoResendTimer.foreach(_.cancel()) - autoResendTimer = None - scheduleAutoResend() - } + // Aim for a maximum of 100 resend/s = 0.1 resend/ms + val maxResendRate = 100.0 + val resendLimit = + math.min( + 1000, + math.max((settings.SysResendTimeout.toMillis * (maxResendRate / 1000.0)).toInt, 1)) override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate @@ -214,6 +211,8 @@ private[remote] class ReliableDeliverySupervisor( log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}", remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy) uidConfirmed = false // Need confirmation of UID again + if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) + bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) context.become(gated) currentHandle = None context.parent ! StoppedReading(self) @@ -223,16 +222,12 @@ private[remote] class ReliableDeliverySupervisor( var currentHandle: Option[AkkaProtocolHandle] = handleOrActive var resendBuffer: AckedSendBuffer[Send] = _ - var lastCumulativeAck: SeqNo = _ var seqCounter: Long = _ - var pendingAcks = Vector.empty[Ack] def reset() { resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) - scheduleAutoResend() - lastCumulativeAck = SeqNo(-1) seqCounter = 0L - pendingAcks = Vector.empty + bailoutAt = None } reset() @@ -245,7 +240,7 @@ private[remote] class ReliableDeliverySupervisor( var writer: ActorRef = createWriter() var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid } - val bailoutAt: Deadline = Deadline.now + settings.InitialSysMsgDeliveryTimeout + var bailoutAt: Option[Deadline] = None // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore @@ -254,11 +249,6 @@ private[remote] class ReliableDeliverySupervisor( // (This actor is never restarted) var uidConfirmed: Boolean = uid.isDefined - def unstashAcks(): Unit = { - pendingAcks foreach (self ! _) - pendingAcks = Vector.empty - } - override def postStop(): Unit = { // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence // number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are @@ -268,6 +258,7 @@ private[remote] class ReliableDeliverySupervisor( // the remote system later. (resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s ⇒ context.system.deadLetters ! s.copy(seqOpt = None) } receiveBuffers.remove(Link(localAddress, remoteAddress)) + autoResendTimer.cancel() } override def postRestart(reason: Throwable): Unit = { @@ -284,22 +275,16 @@ private[remote] class ReliableDeliverySupervisor( case s: Send ⇒ handleSend(s) case ack: Ack ⇒ - if (!uidConfirmed) pendingAcks = pendingAcks :+ ack - else { + // If we are not sure about the UID just ignore the ack. Ignoring is fine. + if (uidConfirmed) { try resendBuffer = resendBuffer.acknowledge(ack) catch { case NonFatal(e) ⇒ - throw new InvalidAssociationException(s"Error encountered while processing system message acknowledgement $resendBuffer $ack", e) + throw new HopelessAssociation(localAddress, remoteAddress, uid, + new IllegalStateException(s"Error encountered while processing system message " + + s"acknowledgement buffer: $resendBuffer ack: $ack", e)) } - if (lastCumulativeAck < ack.cumulativeAck) { - lastCumulativeAck = ack.cumulativeAck - // Cumulative ack is progressing, we might not need to resend non-acked messages yet. - // If this progression stops, the timer will eventually kick in, since scheduleAutoResend - // does not cancel existing timers (see the "else" case). - rescheduleAutoResend() - } else scheduleAutoResend() - resendNacked() } case AttemptSysMsgRedelivery ⇒ @@ -311,11 +296,11 @@ private[remote] class ReliableDeliverySupervisor( context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) case g @ GotUid(receivedUid, _) ⇒ + bailoutAt = None context.parent ! g // New system that has the same address as the old - need to start from fresh state uidConfirmed = true if (uid.exists(_ != receivedUid)) reset() - else unstashAcks() uid = Some(receivedUid) resendAll() @@ -333,13 +318,14 @@ private[remote] class ReliableDeliverySupervisor( // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable // again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined. // In other words, this action is safe. - if (!uidConfirmed && bailoutAt.isOverdue()) - throw new InvalidAssociation(localAddress, remoteAddress, + if (bailoutAt.exists(_.isOverdue())) + throw new HopelessAssociation(localAddress, remoteAddress, uid, new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped.")) writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished context.become(receive) } else context.become(idle) + case AttemptSysMsgRedelivery ⇒ // Ignore case s @ Send(msg: SystemMessage, _, _, _) ⇒ tryBuffer(s.copy(seqOpt = Some(nextSeq()))) case s: Send ⇒ context.system.deadLetters ! s case EndpointWriter.FlushAndStop ⇒ context.stop(self) @@ -355,9 +341,11 @@ private[remote] class ReliableDeliverySupervisor( handleSend(s) context.become(receive) case AttemptSysMsgRedelivery ⇒ - writer = createWriter() - // Resending will be triggered by the incoming GotUid message after the connection finished - context.become(receive) + if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) { + writer = createWriter() + // Resending will be triggered by the incoming GotUid message after the connection finished + context.become(receive) + } case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w, replyTo) ⇒ replyTo ! EndpointWriter.StoppedReading(w) @@ -378,15 +366,15 @@ private[remote] class ReliableDeliverySupervisor( tryBuffer(sequencedSend) // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. // GotUid will kick resendAll() causing the messages to be properly written - if (uidConfirmed) writer ! sequencedSend + if (uidConfirmed) + writer ! sequencedSend } else writer ! send private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ } private def resendAll(): Unit = { resendNacked() - resendBuffer.nonAcked foreach { writer ! _ } - rescheduleAutoResend() + resendBuffer.nonAcked.take(resendLimit) foreach { writer ! _ } } private def tryBuffer(s: Send): Unit = @@ -810,6 +798,8 @@ private[remote] class EndpointWriter( context.stop(self) case OutboundAck(ack) ⇒ lastAck = Some(ack) + if (ackDeadline.isOverdue()) + trySendPureAck() case AckIdleCheckTimer ⇒ // Ignore case FlushAndStopTimeout ⇒ // ignore case BackoffTimer ⇒ // ignore diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 67819e29c8..daf298df2d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -459,7 +459,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop - case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ + case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ + log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", + remoteAddress, uid) settings.QuarantineDuration match { case d: FiniteDuration ⇒ endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) @@ -673,7 +675,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint) else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) - endpoints.removePolicy(handle.remoteAddress) + if (!endpoints.hasWritableEndpointFor(handle.remoteAddress)) + endpoints.removePolicy(handle.remoteAddress) } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 82c7ca6959..3525324a86 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -471,7 +471,7 @@ private[transport] class ThrottledAssociation( case Event(Disassociated(info), _) ⇒ stop() // not notifying the upstream handler is intentional: we are relying on heartbeating case Event(FailWith(reason), _) ⇒ - upstreamListener notify Disassociated(reason) + if (upstreamListener ne null) upstreamListener notify Disassociated(reason) stop() } diff --git a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala index d18bb9b285..ea61a059f3 100644 --- a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala @@ -130,7 +130,7 @@ class AckedDeliverySpec extends AkkaSpec { val b8 = b7.acknowledge(Ack(SeqNo(2))) b8.nonAcked should be(Vector(msg3, msg4)) - val b9 = b8.acknowledge(Ack(SeqNo(5))) + val b9 = b8.acknowledge(Ack(SeqNo(4))) b9.nonAcked should be(Vector.empty) } @@ -164,7 +164,7 @@ class AckedDeliverySpec extends AkkaSpec { b6.nonAcked should be(Vector()) b6.nacked should be(Vector(msg2, msg3)) - val b7 = b6.acknowledge(Ack(SeqNo(5))) + val b7 = b6.acknowledge(Ack(SeqNo(4))) b7.nonAcked should be(Vector.empty) b7.nacked should be(Vector.empty) } diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index 4e8ff810e0..021281c183 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -3,6 +3,7 @@ */ package akka.remote.transport +import akka.remote.transport.ThrottlerTransportAdapter._ import akka.testkit.TimingTest import akka.testkit.DefaultTimeout import akka.testkit.ImplicitSender @@ -12,8 +13,7 @@ import AkkaProtocolStressTest._ import akka.actor._ import scala.concurrent.duration._ import akka.testkit._ -import akka.remote.EndpointException -import akka.remote.{ RARP, EndpointException } +import akka.remote.{ QuarantinedEvent, EndpointException, RARP } import akka.remote.transport.FailureInjectorTransportAdapter.{ One, All, Drop } import scala.concurrent.Await import akka.actor.ActorRef @@ -32,7 +32,11 @@ import akka.dispatch.sysmsg.{ Failed, SystemMessage } import akka.pattern.pipe object SystemMessageDeliveryStressTest { - val baseConfig: Config = ConfigFactory parseString (""" + val msgCount = 5000 + val burstSize = 100 + val burstDelay = 500.millis + + val baseConfig: Config = ConfigFactory parseString (s""" akka { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" @@ -41,18 +45,18 @@ object SystemMessageDeliveryStressTest { remote.log-remote-lifecycle-events = on remote.transport-failure-detector { - threshold = 1.0 - max-sample-size = 2 - min-std-deviation = 1 ms - heartbeat-interval = 500 ms - acceptable-heartbeat-pause = 2 s + heartbeat-interval = 1 s + acceptable-heartbeat-pause = 5 s } + remote.system-message-buffer-size = $msgCount ## Keep this setting tight, otherwise the test takes a long time or times out - remote.resend-interval = 0.5 s + remote.resend-interval = 2 s + remote.system-message-ack-piggyback-timeout = 100 ms // Force heavy Ack traffic + remote.initial-system-message-delivery-timeout = 10 m remote.use-passive-connections = on remote.netty.tcp { - applied-adapters = ["gremlin"] + applied-adapters = ["gremlin", "trttl"] port = 0 } @@ -75,8 +79,11 @@ object SystemMessageDeliveryStressTest { } } - class SystemMessageSender(val msgCount: Int, val target: ActorRef) extends Actor { + class SystemMessageSender(val msgCount: Int, val burstSize: Int, val burstDelay: FiniteDuration, val target: ActorRef) extends Actor { + import context.dispatcher + var counter = 0 + var burstCounter = 0 val targetRef = target.asInstanceOf[InternalActorRef] override def preStart(): Unit = self ! "sendnext" @@ -85,7 +92,15 @@ object SystemMessageDeliveryStressTest { case "sendnext" ⇒ targetRef.sendSystemMessage(Failed(null, null, counter)) counter += 1 - if (counter < msgCount) self ! "sendnext" + burstCounter += 1 + + if (counter < msgCount) { + if (burstCounter < burstSize) self ! "sendnext" + else { + burstCounter = 0 + context.system.scheduler.scheduleOnce(burstDelay, self, "sendnext") + } + } } } @@ -97,36 +112,69 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) with DefaultTimeout { import SystemMessageDeliveryStressTest._ - val systemB = ActorSystem("systemB", system.settings.config) - val sysMsgVerifier = new SystemMessageSequenceVerifier(system, testActor) - val MsgCount = 100 + override def expectedTestDuration: FiniteDuration = 120.seconds - val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val systemA = system + val systemB = ActorSystem("systemB", system.settings.config) + + val probeA = TestProbe()(systemA) + val probeB = TestProbe()(systemB) + + val sysMsgVerifierA = new SystemMessageSequenceVerifier(systemA, probeA.ref) + val sysMsgVerifierB = new SystemMessageSequenceVerifier(systemB, probeB.ref) + + val addressA = systemA.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val root = RootActorPath(address) // We test internals here (system message delivery) so we are allowed to cheat - val there = RARP(systemB).provider.resolveActorRef(root / "temp" / sysMsgVerifier.path.name).asInstanceOf[InternalActorRef] + val targetForA = RARP(systemA).provider.resolveActorRef(RootActorPath(addressB) / "temp" / sysMsgVerifierB.path.name) + val targetForB = RARP(systemB).provider.resolveActorRef(RootActorPath(addressA) / "temp" / sysMsgVerifierA.path.name) override def atStartup() = { - system.eventStream.publish(TestEvent.Mute( + systemA.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointException](), EventFilter.error(start = "AssociationError"), - EventFilter.warning(pattern = "received dead letter.*"))) + EventFilter.warning(pattern = "received dead .*"))) systemB.eventStream.publish(TestEvent.Mute( EventFilter[EndpointException](), EventFilter.error(start = "AssociationError"), - EventFilter.warning(pattern = "received dead letter.*"))) + EventFilter.warning(pattern = "received dead .*"))) + + systemA.eventStream.subscribe(probeA.ref, classOf[QuarantinedEvent]) + systemB.eventStream.subscribe(probeB.ref, classOf[QuarantinedEvent]) } "Remoting " + msg must { "guaranteed delivery and message ordering despite packet loss " taggedAs TimingTest in { - Await.result(RARP(systemB).provider.transport.managementCommand(One(address, Drop(0.3, 0.3))), 3.seconds.dilated) - systemB.actorOf(Props(classOf[SystemMessageSender], MsgCount, there)) + import systemA.dispatcher - val toSend = (0 until MsgCount).toList - val received = expectMsgAllOf(45.seconds, toSend: _*) + val transportA = RARP(systemA).provider.transport + val transportB = RARP(systemB).provider.transport - received should be(toSend) + Await.result(transportA.managementCommand(One(addressB, Drop(0.1, 0.1))), 3.seconds.dilated) + Await.result(transportB.managementCommand(One(addressA, Drop(0.1, 0.1))), 3.seconds.dilated) + + // Schedule peridodic disassociates + systemA.scheduler.schedule(3.second, 8.seconds) { + transportA.managementCommand(ForceDisassociateExplicitly(addressB, reason = AssociationHandle.Unknown)) + } + + systemB.scheduler.schedule(7.seconds, 8.seconds) { + transportB.managementCommand(ForceDisassociateExplicitly(addressA, reason = AssociationHandle.Unknown)) + } + + systemB.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForB)) + systemA.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForA)) + + val toSend = (0 until msgCount).toList + var maxDelay = 0L + + for (m ← 0 until msgCount) { + val start = System.currentTimeMillis() + probeB.expectMsg(10.minutes, m) + probeA.expectMsg(10.minutes, m) + maxDelay = math.max(maxDelay, (System.currentTimeMillis() - start) / 1000) + } } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e55793545e..02ec37cbf5 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -347,7 +347,7 @@ object AkkaBuild extends Build { lazy val mimaIgnoredProblems = { Seq( - // add filters here, see release-2.2 branch + // add filters here, see release-2.3 branch ) }