* sequence numbers must, of course, be tracked by origin system * add unit test for SystemMessageAcker stage * enable ArteryRemoteRoundRobinSpec
This commit is contained in:
parent
e818887bb2
commit
b6a94e1758
6 changed files with 153 additions and 27 deletions
|
|
@ -37,13 +37,12 @@ class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
||||||
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
||||||
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
|
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
|
||||||
|
|
||||||
// FIXME this is failing with Artery
|
class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
|
||||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
|
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
||||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
||||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
|
||||||
|
|
||||||
object AttemptSysMsgRedeliverySpec {
|
object AttemptSysMsgRedeliverySpec {
|
||||||
class Echo extends Actor {
|
class Echo extends Actor {
|
||||||
|
|
|
||||||
|
|
@ -134,11 +134,6 @@ abstract class RemoteReDeploymentMultiJvmSpec(multiNodeConfig: RemoteReDeploymen
|
||||||
|
|
||||||
enterBarrier("first-deployed")
|
enterBarrier("first-deployed")
|
||||||
|
|
||||||
// FIXME When running with Artery:
|
|
||||||
// [akka://RemoteReDeploymentMultiJvmSpec/user/parent] received Supervise from unregistered child
|
|
||||||
// Actor[artery://RemoteReDeploymentMultiJvmSpec@localhost:55627/remote/artery/RemoteReDeploymentMultiJvmSpec@localhost:65490/user/parent/hello#-370928728],
|
|
||||||
// this will not end well
|
|
||||||
|
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
testConductor.blackhole(second, first, Both).await
|
testConductor.blackhole(second, first, Both).await
|
||||||
testConductor.shutdown(second, abort = true).await
|
testConductor.shutdown(second, abort = true).await
|
||||||
|
|
|
||||||
|
|
@ -65,14 +65,10 @@ class RemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRound
|
||||||
class RemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false))
|
class RemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false))
|
||||||
class RemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false))
|
class RemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = false))
|
||||||
|
|
||||||
// FIXME this test fails with Artery
|
class ArteryRemoteRoundRobinMultiJvmNode1 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
||||||
// [akka://RemoteRoundRobinSpec/user/service-hello2] received Supervise from unregistered child
|
class ArteryRemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
||||||
// Actor[artery://RemoteRoundRobinSpec@localhost:52247/remote/artery/RemoteRoundRobinSpec@localhost:56386/user/service-hello2/c2#-2080820302],
|
class ArteryRemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
||||||
// this will not end well
|
class ArteryRemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
||||||
//class ArteryRemoteRoundRobinMultiJvmNode1 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
|
||||||
//class ArteryRemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
|
||||||
//class ArteryRemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
|
||||||
//class ArteryRemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec(new RemoteRoundRobinConfig(artery = true))
|
|
||||||
|
|
||||||
object RemoteRoundRobinSpec {
|
object RemoteRoundRobinSpec {
|
||||||
class SomeActor extends Actor {
|
class SomeActor extends Actor {
|
||||||
|
|
|
||||||
|
|
@ -233,7 +233,8 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||||
|
|
||||||
var seqNo = 1L
|
// TODO we might need have to prune old unused entries
|
||||||
|
var sequenceNumbers = Map.empty[UniqueAddress, Long]
|
||||||
|
|
||||||
def localAddress = inboundContext.localAddress
|
def localAddress = inboundContext.localAddress
|
||||||
|
|
||||||
|
|
@ -242,16 +243,20 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G
|
||||||
val env = grab(in)
|
val env = grab(in)
|
||||||
env.message match {
|
env.message match {
|
||||||
case sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo) ⇒
|
case sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo) ⇒
|
||||||
if (n == seqNo) {
|
val expectedSeqNo = sequenceNumbers.get(ackReplyTo) match {
|
||||||
|
case None ⇒ 1L
|
||||||
|
case Some(seqNo) ⇒ seqNo
|
||||||
|
}
|
||||||
|
if (n == expectedSeqNo) {
|
||||||
inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress))
|
inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress))
|
||||||
seqNo += 1
|
sequenceNumbers = sequenceNumbers.updated(ackReplyTo, n + 1)
|
||||||
val unwrapped = env.withMessage(sysEnv.message)
|
val unwrapped = env.withMessage(sysEnv.message)
|
||||||
push(out, unwrapped)
|
push(out, unwrapped)
|
||||||
} else if (n < seqNo) {
|
} else if (n < expectedSeqNo) {
|
||||||
inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress))
|
inboundContext.sendControl(ackReplyTo.address, Ack(expectedSeqNo - 1, localAddress))
|
||||||
pull(in)
|
pull(in)
|
||||||
} else {
|
} else {
|
||||||
inboundContext.sendControl(ackReplyTo.address, Nack(seqNo - 1, localAddress))
|
inboundContext.sendControl(ackReplyTo.address, Nack(expectedSeqNo - 1, localAddress))
|
||||||
pull(in)
|
pull(in)
|
||||||
}
|
}
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,6 @@ class RemoteRouterSpec extends AkkaSpec("""
|
||||||
val probe = TestProbe()(masterSystem)
|
val probe = TestProbe()(masterSystem)
|
||||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub")
|
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub")
|
||||||
val replies = collectRouteePaths(probe, router, 5)
|
val replies = collectRouteePaths(probe, router, 5)
|
||||||
println(s"# replies $replies") // FIXME
|
|
||||||
val children = replies.toSet
|
val children = replies.toSet
|
||||||
children should have size 2
|
children should have size 2
|
||||||
children.map(_.parent) should have size 1
|
children.map(_.parent) should have size 1
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.remote.UniqueAddress
|
||||||
|
import akka.remote.artery.SystemMessageDelivery._
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.ActorMaterializerSettings
|
||||||
|
import akka.stream.scaladsl.Keep
|
||||||
|
import akka.stream.testkit.TestPublisher
|
||||||
|
import akka.stream.testkit.TestSubscriber
|
||||||
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
|
import akka.stream.testkit.scaladsl.TestSource
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
import akka.testkit.ImplicitSender
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
import akka.util.OptionVal
|
||||||
|
|
||||||
|
class SystemMessageAckerSpec extends AkkaSpec with ImplicitSender {
|
||||||
|
|
||||||
|
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
|
||||||
|
implicit val mat = ActorMaterializer(matSettings)(system)
|
||||||
|
|
||||||
|
val addressA = UniqueAddress(Address("artery", "sysA", "hostA", 1001), 1)
|
||||||
|
val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2)
|
||||||
|
val addressC = UniqueAddress(Address("artery", "sysC", "hostB", 1003), 3)
|
||||||
|
|
||||||
|
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
|
||||||
|
val recipient = OptionVal.None // not used
|
||||||
|
TestSource.probe[AnyRef]
|
||||||
|
.map {
|
||||||
|
case sysMsg @ SystemMessageEnvelope(_, _, ackReplyTo) ⇒
|
||||||
|
InboundEnvelope(recipient, addressA.address, sysMsg, OptionVal.None, ackReplyTo.uid,
|
||||||
|
inboundContext.association(ackReplyTo.uid))
|
||||||
|
}
|
||||||
|
.via(new SystemMessageAcker(inboundContext))
|
||||||
|
.map { case env: InboundEnvelope ⇒ env.message }
|
||||||
|
.toMat(TestSink.probe[Any])(Keep.both)
|
||||||
|
.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
"SystemMessageAcker stage" must {
|
||||||
|
|
||||||
|
"send Ack for expected message" in {
|
||||||
|
val replyProbe = TestProbe()
|
||||||
|
val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref))
|
||||||
|
val (upstream, downstream) = setupStream(inboundContext)
|
||||||
|
|
||||||
|
downstream.request(10)
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(2, addressA))
|
||||||
|
downstream.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
"send Ack for duplicate message" in {
|
||||||
|
val replyProbe = TestProbe()
|
||||||
|
val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref))
|
||||||
|
val (upstream, downstream) = setupStream(inboundContext)
|
||||||
|
|
||||||
|
downstream.request(10)
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(2, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(2, addressA))
|
||||||
|
downstream.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
"send Nack for unexpected message" in {
|
||||||
|
val replyProbe = TestProbe()
|
||||||
|
val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref))
|
||||||
|
val (upstream, downstream) = setupStream(inboundContext)
|
||||||
|
|
||||||
|
downstream.request(10)
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b3", 3, addressB))
|
||||||
|
replyProbe.expectMsg(Nack(1, addressA))
|
||||||
|
downstream.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
"send Nack for unexpected first message" in {
|
||||||
|
val replyProbe = TestProbe()
|
||||||
|
val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref))
|
||||||
|
val (upstream, downstream) = setupStream(inboundContext)
|
||||||
|
|
||||||
|
downstream.request(10)
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB))
|
||||||
|
replyProbe.expectMsg(Nack(0, addressA))
|
||||||
|
downstream.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
"keep track of sequence numbers per sending system" in {
|
||||||
|
val replyProbe = TestProbe()
|
||||||
|
val inboundContext = new TestInboundContext(addressA, controlProbe = Some(replyProbe.ref))
|
||||||
|
val (upstream, downstream) = setupStream(inboundContext)
|
||||||
|
|
||||||
|
downstream.request(10)
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b1", 1, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b2", 2, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(2, addressA))
|
||||||
|
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("c1", 1, addressC))
|
||||||
|
replyProbe.expectMsg(Ack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("c3", 3, addressC))
|
||||||
|
replyProbe.expectMsg(Nack(1, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("c2", 2, addressC))
|
||||||
|
replyProbe.expectMsg(Ack(2, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("c3", 3, addressC))
|
||||||
|
replyProbe.expectMsg(Ack(3, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("c4", 4, addressC))
|
||||||
|
replyProbe.expectMsg(Ack(4, addressA))
|
||||||
|
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b4", 4, addressB))
|
||||||
|
replyProbe.expectMsg(Nack(2, addressA))
|
||||||
|
upstream.sendNext(SystemMessageEnvelope("b3", 3, addressB))
|
||||||
|
replyProbe.expectMsg(Ack(3, addressA))
|
||||||
|
|
||||||
|
downstream.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue