#2726 - removing a potential race between initializing the ReliableProxySpec
This commit is contained in:
parent
cb7330f738
commit
ac67edc103
1 changed files with 49 additions and 40 deletions
|
|
@ -41,62 +41,71 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
}
|
||||
}
|
||||
|
||||
runOn(remote) {
|
||||
system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor ! x
|
||||
}
|
||||
}), "echo")
|
||||
}
|
||||
@volatile var target: ActorRef = system.deadLetters
|
||||
@volatile var proxy: ActorRef = system.deadLetters
|
||||
|
||||
val target = system.actorFor(node(remote) / "user" / "echo")
|
||||
|
||||
var proxy: ActorRef = _
|
||||
def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
|
||||
def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
|
||||
|
||||
runOn(local) {
|
||||
//#demo
|
||||
import akka.contrib.pattern.ReliableProxy
|
||||
|
||||
proxy = system.actorOf(Props(new ReliableProxy(target, 100.millis)), "proxy")
|
||||
//#demo
|
||||
proxy ! FSM.SubscribeTransitionCallBack(testActor)
|
||||
expectState(Idle)
|
||||
//#demo
|
||||
proxy ! "hello"
|
||||
//#demo
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
expectMsg("hello")
|
||||
}
|
||||
|
||||
def sendN(n: Int) = (1 to n) foreach (proxy ! _)
|
||||
def expectN(n: Int) = (1 to n) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
|
||||
"A ReliableProxy" must {
|
||||
|
||||
"initialize properly" in {
|
||||
runOn(remote) {
|
||||
target = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x ⇒ testActor ! x
|
||||
}
|
||||
}), "echo")
|
||||
}
|
||||
|
||||
enterBarrier("initialize")
|
||||
|
||||
runOn(local) {
|
||||
//#demo
|
||||
import akka.contrib.pattern.ReliableProxy
|
||||
|
||||
target = system.actorFor(node(remote) / "user" / "echo")
|
||||
proxy = system.actorOf(Props(new ReliableProxy(target, 100.millis)), "proxy")
|
||||
//#demo
|
||||
proxy ! FSM.SubscribeTransitionCallBack(testActor)
|
||||
expectState(Idle)
|
||||
//#demo
|
||||
proxy ! "hello"
|
||||
//#demo
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
|
||||
runOn(remote) {
|
||||
expectMsg("hello")
|
||||
}
|
||||
}
|
||||
|
||||
"forward messages in sequence" in {
|
||||
runOn(local) {
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
sendN(100)
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(100)
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("test1a")
|
||||
|
||||
runOn(local) {
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
sendN(100)
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(100)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -106,7 +115,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
"retry when sending fails" in {
|
||||
runOn(local) {
|
||||
testConductor.blackhole(local, remote, Direction.Send).await
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
sendN(100)
|
||||
within(1 second) {
|
||||
expectTransition(Idle, Active)
|
||||
expectNoMsg
|
||||
|
|
@ -127,7 +136,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(100)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -137,7 +146,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
"retry when receiving fails" in {
|
||||
runOn(local) {
|
||||
testConductor.blackhole(local, remote, Direction.Receive).await
|
||||
(1 to 100) foreach (proxy ! _)
|
||||
sendN(100)
|
||||
within(1 second) {
|
||||
expectTransition(Idle, Active)
|
||||
expectNoMsg
|
||||
|
|
@ -145,7 +154,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(100)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -162,7 +171,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
"resend across a slow link" in {
|
||||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.1).await
|
||||
(1 to 50) foreach (proxy ! _)
|
||||
sendN(50)
|
||||
within(5 seconds) {
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
|
|
@ -170,7 +179,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
}
|
||||
runOn(remote) {
|
||||
within(5 seconds) {
|
||||
(1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(50)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -179,7 +188,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
runOn(local) {
|
||||
testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await
|
||||
testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await
|
||||
(1 to 50) foreach (proxy ! _)
|
||||
sendN(50)
|
||||
within(5 seconds) {
|
||||
expectTransition(Idle, Active)
|
||||
expectTransition(Active, Idle)
|
||||
|
|
@ -187,7 +196,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
|
|||
}
|
||||
runOn(remote) {
|
||||
within(1 second) {
|
||||
(1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target }
|
||||
expectN(50)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue