parent
a6a5556a8f
commit
6cbfed975b
1 changed files with 16 additions and 18 deletions
|
|
@ -268,8 +268,10 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat
|
|||
|
||||
val systemWrap = newRemoteSystem(extraConfig = Some(extraConfig))
|
||||
|
||||
val wrapRefProbe = TestProbe()(systemWrap)
|
||||
system.eventStream.subscribe(wrapRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
||||
val receivedActorRefCompressionTableProbe = TestProbe()(system)
|
||||
system.eventStream.subscribe(
|
||||
receivedActorRefCompressionTableProbe.ref,
|
||||
classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
||||
|
||||
def createAndIdentify(i: Int) = {
|
||||
val echoWrap = systemWrap.actorOf(TestActors.echoActorProps, s"echo_$i")
|
||||
|
|
@ -278,32 +280,28 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat
|
|||
}
|
||||
|
||||
val maxTableVersions = 130 // so table version wraps around at least once
|
||||
val maxDuplicateTables = 40 // max duplicate tables that will not fail the test
|
||||
var tableVersionsSeen = 0
|
||||
var lastTableVersion = 0
|
||||
var iteration = 0
|
||||
|
||||
while (tableVersionsSeen < maxTableVersions) {
|
||||
iteration += 1
|
||||
if (iteration - maxTableVersions > maxDuplicateTables) {
|
||||
throw new Error("Too much duplicate tables. Giving up on the test.")
|
||||
}
|
||||
|
||||
for (iteration ← 1 to maxTableVersions) {
|
||||
val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration
|
||||
|
||||
// cause echo to become a heavy hitter
|
||||
(1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") }
|
||||
receiveN(messagesToExchange) // the replies
|
||||
|
||||
// on system A side
|
||||
val a1 = wrapRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
||||
val currentTableVersion = a1.table.version.toInt
|
||||
|
||||
if (currentTableVersion != lastTableVersion) { // if we get a new table
|
||||
receivedActorRefCompressionTableProbe.within(5.seconds) {
|
||||
var currentTableVersion = -1
|
||||
// discard duplicates with awaitAssert until we receive next version
|
||||
receivedActorRefCompressionTableProbe.awaitAssert {
|
||||
val a1 = receivedActorRefCompressionTableProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
||||
currentTableVersion = a1.table.version.toInt
|
||||
// until we get next version, discard duplicates
|
||||
currentTableVersion should !==(lastTableVersion)
|
||||
}
|
||||
lastTableVersion = currentTableVersion
|
||||
tableVersionsSeen += 1
|
||||
currentTableVersion should ===(iteration & 0x7F)
|
||||
}
|
||||
currentTableVersion should ===(tableVersionsSeen & 0x7F)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue