2016-06-23 11:58:54 +02:00
|
|
|
/*
|
|
|
|
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.remote.artery.compress
|
|
|
|
|
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
import akka.actor._
|
|
|
|
|
import akka.pattern.ask
|
|
|
|
|
import akka.remote.artery.compress.CompressionProtocol.Events
|
|
|
|
|
import akka.testkit._
|
|
|
|
|
import akka.util.Timeout
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import org.scalatest.BeforeAndAfter
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.Await
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
|
|
|
import akka.serialization.SerializerWithStringManifest
|
|
|
|
|
import akka.remote.artery.ArteryMultiNodeSpec
|
2016-06-23 11:58:54 +02:00
|
|
|
|
|
|
|
|
object CompressionIntegrationSpec {
|
|
|
|
|
|
|
|
|
|
val commonConfig = ConfigFactory.parseString(s"""
|
|
|
|
|
akka {
|
|
|
|
|
loglevel = INFO
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2016-09-02 08:52:09 +01:00
|
|
|
actor {
|
|
|
|
|
serializers {
|
|
|
|
|
test-message = "akka.remote.artery.compress.TestMessageSerializer"
|
|
|
|
|
}
|
|
|
|
|
serialization-bindings {
|
|
|
|
|
"akka.remote.artery.compress.TestMessage" = test-message
|
|
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2016-07-01 11:54:57 +02:00
|
|
|
remote.artery.advanced.compression {
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
actor-refs.advertisement-interval = 2 seconds
|
|
|
|
|
manifests.advertisement-interval = 2 seconds
|
2016-07-01 11:54:57 +02:00
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
""")
|
|
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrationSpec.commonConfig)
|
|
|
|
|
with ImplicitSender {
|
|
|
|
|
import CompressionIntegrationSpec._
|
|
|
|
|
|
|
|
|
|
val systemB = newRemoteSystem(name = Some("systemB"))
|
|
|
|
|
val messagesToExchange = 10
|
|
|
|
|
|
|
|
|
|
"Compression table" must {
|
|
|
|
|
"be advertised for chatty ActorRef and manifest" in {
|
|
|
|
|
// listen for compression table events
|
|
|
|
|
val aManifestProbe = TestProbe()(system)
|
|
|
|
|
val bManifestProbe = TestProbe()(systemB)
|
|
|
|
|
system.eventStream.subscribe(aManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
|
|
|
|
|
systemB.eventStream.subscribe(bManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
|
|
|
|
|
val aRefProbe = TestProbe()(system)
|
|
|
|
|
val bRefProbe = TestProbe()(systemB)
|
|
|
|
|
system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
systemB.eventStream.subscribe(bRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
|
|
|
|
|
val echoRefB = systemB.actorOf(TestActors.echoActorProps, "echo")
|
|
|
|
|
|
|
|
|
|
system.actorSelection(rootActorPath(systemB) / "user" / "echo") ! Identify(None)
|
|
|
|
|
val echoRefA = expectMsgType[ActorIdentity].ref.get
|
|
|
|
|
|
|
|
|
|
// cause TestMessage manifest to become a heavy hitter
|
|
|
|
|
// cause echo to become a heavy hitter
|
|
|
|
|
(1 to messagesToExchange).foreach { i ⇒ echoRefA ! TestMessage("hello") }
|
|
|
|
|
receiveN(messagesToExchange) // the replies
|
|
|
|
|
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
// on system A side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a1 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a1)
|
2016-09-21 10:22:08 +03:00
|
|
|
a1.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
a1.table.dictionary.keySet should contain("TestMessageManifest")
|
|
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a1)
|
2016-09-21 10:22:08 +03:00
|
|
|
a1.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
a1.table.dictionary.keySet should contain(echoRefA) // recipient
|
|
|
|
|
a1.table.dictionary.keySet should contain(testActor) // sender
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// on system B side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val b1 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [B] received: " + b1)
|
2016-09-21 10:22:08 +03:00
|
|
|
b1.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
b1.table.dictionary.keySet should contain("TestMessageManifest")
|
|
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val b1 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [B] received: " + b1)
|
2016-09-21 10:22:08 +03:00
|
|
|
b1.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
b1.table.dictionary.keySet should contain(echoRefB)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// and if we continue sending new advertisements with higher version number are advertised
|
|
|
|
|
within(20.seconds) {
|
|
|
|
|
val ignore = TestProbe()(system)
|
|
|
|
|
awaitAssert {
|
|
|
|
|
echoRefA.tell(TestMessage("hello2"), ignore.ref)
|
|
|
|
|
val a2 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received more: " + a2)
|
2016-09-21 10:22:08 +03:00
|
|
|
a2.table.version.toInt should be >= (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
echoRefA.tell(TestMessage("hello2"), ignore.ref)
|
|
|
|
|
val a2 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received more: " + a2)
|
2016-09-21 10:22:08 +03:00
|
|
|
a2.table.version.toInt should be >= (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
awaitAssert {
|
|
|
|
|
echoRefA.tell(TestMessage("hello3"), ignore.ref)
|
|
|
|
|
val b2 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [B] received more: " + b2)
|
2016-09-21 10:22:08 +03:00
|
|
|
b2.table.version.toInt should be >= (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
echoRefA.tell(TestMessage("hello3"), ignore.ref)
|
|
|
|
|
val b2 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [B] received more: " + b2)
|
2016-09-21 10:22:08 +03:00
|
|
|
b2.table.version.toInt should be >= (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-09-21 10:22:08 +03:00
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"handle noSender sender" in {
|
|
|
|
|
val aRefProbe = TestProbe()(systemB)
|
|
|
|
|
system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
|
|
|
|
|
val probeB = TestProbe()(systemB)
|
|
|
|
|
systemB.actorOf(TestActors.forwardActorProps(probeB.ref), "fw1")
|
|
|
|
|
|
|
|
|
|
system.actorSelection(rootActorPath(systemB) / "user" / "fw1") ! Identify(None)
|
|
|
|
|
val fwRefA = expectMsgType[ActorIdentity].ref.get
|
|
|
|
|
|
|
|
|
|
fwRefA.tell(TestMessage("hello-fw1-a"), ActorRef.noSender)
|
|
|
|
|
probeB.expectMsg(TestMessage("hello-fw1-a"))
|
|
|
|
|
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
// on system A side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a1)
|
|
|
|
|
a1.table.dictionary.keySet should contain(fwRefA) // recipient
|
|
|
|
|
a1.table.dictionary.keySet should not contain (system.deadLetters) // sender
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fwRefA.tell(TestMessage("hello-fw1-b"), ActorRef.noSender)
|
|
|
|
|
probeB.expectMsg(TestMessage("hello-fw1-b"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"handle deadLetters sender" in {
|
|
|
|
|
val aRefProbe = TestProbe()(systemB)
|
|
|
|
|
system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
|
|
|
|
|
val probeB = TestProbe()(systemB)
|
|
|
|
|
systemB.actorOf(TestActors.forwardActorProps(probeB.ref), "fw2")
|
|
|
|
|
|
|
|
|
|
system.actorSelection(rootActorPath(systemB) / "user" / "fw2") ! Identify(None)
|
|
|
|
|
val fwRefA = expectMsgType[ActorIdentity].ref.get
|
|
|
|
|
|
|
|
|
|
fwRefA.tell(TestMessage("hello-fw2-a"), ActorRef.noSender)
|
|
|
|
|
probeB.expectMsg(TestMessage("hello-fw2-a"))
|
|
|
|
|
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
// on system A side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a1)
|
|
|
|
|
a1.table.dictionary.keySet should contain(fwRefA) // recipient
|
|
|
|
|
a1.table.dictionary.keySet should not contain (system.deadLetters) // sender
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fwRefA.tell(TestMessage("hello-fw2-b"), ActorRef.noSender)
|
|
|
|
|
probeB.expectMsg(TestMessage("hello-fw2-b"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"work when starting new ActorSystem with same hostname:port" in {
|
|
|
|
|
val port = address(systemB).port.get
|
|
|
|
|
shutdown(systemB)
|
|
|
|
|
val systemB2 = newRemoteSystem(
|
|
|
|
|
extraConfig = Some(s"akka.remote.artery.canonical.port=$port"),
|
|
|
|
|
name = Some("systemB"))
|
|
|
|
|
|
|
|
|
|
// listen for compression table events
|
|
|
|
|
val aManifestProbe = TestProbe()(system)
|
|
|
|
|
val bManifestProbe = TestProbe()(systemB2)
|
|
|
|
|
system.eventStream.subscribe(aManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
|
|
|
|
|
systemB2.eventStream.subscribe(bManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
|
|
|
|
|
val aRefProbe = TestProbe()(system)
|
|
|
|
|
val bRefProbe = TestProbe()(systemB2)
|
|
|
|
|
system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
systemB2.eventStream.subscribe(bRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
|
|
|
|
|
val echoRefB2 = systemB2.actorOf(TestActors.echoActorProps, "echo2")
|
|
|
|
|
|
|
|
|
|
// messages to the new system might be dropped, before new handshake is completed
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val p = TestProbe()(system)
|
|
|
|
|
system.actorSelection(rootActorPath(systemB2) / "user" / "echo2").tell(Identify(None), p.ref)
|
|
|
|
|
p.expectMsgType[ActorIdentity](1.second).ref.get
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
system.actorSelection(rootActorPath(systemB2) / "user" / "echo2") ! Identify(None)
|
|
|
|
|
val echoRefA = expectMsgType[ActorIdentity].ref.get
|
|
|
|
|
|
|
|
|
|
// cause TestMessage manifest to become a heavy hitter
|
|
|
|
|
(1 to messagesToExchange).foreach { i ⇒ echoRefA ! TestMessage("hello") }
|
|
|
|
|
receiveN(messagesToExchange) // the replies
|
|
|
|
|
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
// on system A side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a2 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a2)
|
2016-09-21 10:22:08 +03:00
|
|
|
a2.table.version.toInt should be >= (1)
|
|
|
|
|
a2.table.version.toInt should be < (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
a2.table.dictionary.keySet should contain("TestMessageManifest")
|
|
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val a2 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [A] received: " + a2)
|
2016-09-21 10:22:08 +03:00
|
|
|
a2.table.version.toInt should be >= (1)
|
|
|
|
|
a2.table.version.toInt should be < (3)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
a2.table.dictionary.keySet should contain(echoRefA) // recipient
|
|
|
|
|
a2.table.dictionary.keySet should contain(testActor) // sender
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// on system B2 side
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val b2 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
|
|
|
|
|
info("System [B2] received: " + b2)
|
2016-09-21 10:22:08 +03:00
|
|
|
b2.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
b2.table.dictionary.keySet should contain("TestMessageManifest")
|
|
|
|
|
}
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val b2 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
info("System [B] received: " + b2)
|
2016-09-21 10:22:08 +03:00
|
|
|
b2.table.version.toInt should be >= (1)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
b2.table.dictionary.keySet should contain(echoRefB2)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-09-21 10:22:08 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"wrap around" in {
|
|
|
|
|
val extraConfig = """
|
|
|
|
|
akka.remote.artery.advanced.compression {
|
|
|
|
|
actor-refs.advertisement-interval = 10 millis
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
val systemWrap = newRemoteSystem(extraConfig = Some(extraConfig))
|
|
|
|
|
|
|
|
|
|
val wrapRefProbe = TestProbe()(systemWrap)
|
|
|
|
|
system.eventStream.subscribe(wrapRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
|
|
|
|
|
|
|
|
|
|
def createAndIdentify(i: Int) = {
|
|
|
|
|
val echoWrap = systemWrap.actorOf(TestActors.echoActorProps, s"echo_$i")
|
|
|
|
|
system.actorSelection(rootActorPath(systemWrap) / "user" / s"echo_$i") ! Identify(None)
|
|
|
|
|
expectMsgType[ActorIdentity].ref.get
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val maxTableVersions = 130 // so table version wraps around at least once
|
|
|
|
|
val maxDuplicateTables = 40 // max duplicate tables that will not fail the test
|
|
|
|
|
var tableVersionsSeen = 0
|
|
|
|
|
var lastTableVersion = 0
|
|
|
|
|
var wrapAroundCount = 0
|
|
|
|
|
var iteration = 0
|
|
|
|
|
|
|
|
|
|
while (tableVersionsSeen < maxTableVersions) {
|
|
|
|
|
iteration += 1
|
|
|
|
|
if (iteration - maxTableVersions > maxDuplicateTables) {
|
|
|
|
|
throw new Error("Too much duplicate tables. Giving up on the test.")
|
|
|
|
|
}
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
2016-09-21 10:22:08 +03:00
|
|
|
val echoWrap = createAndIdentify(iteration) // create a different actor for every iteration
|
|
|
|
|
|
|
|
|
|
// cause echo to become a heavy hitter
|
|
|
|
|
(1 to messagesToExchange).foreach { i ⇒ echoWrap ! TestMessage("hello") }
|
|
|
|
|
receiveN(messagesToExchange) // the replies
|
|
|
|
|
|
|
|
|
|
// on system A side
|
|
|
|
|
val a1 = wrapRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
|
|
|
|
|
val currentTableVersion = a1.table.version.toInt
|
|
|
|
|
|
|
|
|
|
if (currentTableVersion != lastTableVersion) { // if we get a new table
|
|
|
|
|
lastTableVersion = currentTableVersion
|
|
|
|
|
tableVersionsSeen += 1
|
|
|
|
|
|
|
|
|
|
if ((tableVersionsSeen & 0x7F) == 0) {
|
|
|
|
|
wrapAroundCount += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
currentTableVersion should ===((tableVersionsSeen & 0x7F) + wrapAroundCount)
|
|
|
|
|
}
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final case class TestMessage(name: String)
|
|
|
|
|
|
|
|
|
|
class TestMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
|
|
|
|
|
|
|
|
|
val TestMessageManifest = "TestMessageManifest"
|
|
|
|
|
|
|
|
|
|
override val identifier: Int = 101
|
|
|
|
|
|
|
|
|
|
override def manifest(o: AnyRef): String =
|
|
|
|
|
o match {
|
|
|
|
|
case _: TestMessage ⇒ TestMessageManifest
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toBinary(o: AnyRef): Array[Byte] = o match {
|
|
|
|
|
case msg: TestMessage ⇒ msg.name.getBytes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
|
|
|
|
|
manifest match {
|
|
|
|
|
case TestMessageManifest ⇒ TestMessage(new String(bytes))
|
|
|
|
|
case unknown ⇒ throw new Exception("Unknown manifest: " + unknown)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|