Make it easer to override SystemMessage serialization. Fixes #2940
* Make SystemMessage extend Serializable to avoid ambiguity when
setting serialization-bindings.
* Set serialVersionUID in SystemMessages and create tests to
ensure binary formats remain unchanged.
* Add tests for reference.conf's serialization settings.
* Make some existing serialization tests more robust.
Removed boilerplate from serialization tests
Use actual reference.conf; tidy up
Make serialization compatible
This commit is contained in:
parent
f12e11df46
commit
7b3ec79c0c
4 changed files with 225 additions and 59 deletions
|
|
@ -8,17 +8,20 @@ import language.postfixOps
|
|||
|
||||
import akka.testkit.{ AkkaSpec, EventFilter }
|
||||
import akka.actor._
|
||||
import akka.dispatch._
|
||||
import java.io._
|
||||
import scala.concurrent.Await
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.BeanInfo
|
||||
import com.google.protobuf.Message
|
||||
import com.typesafe.config._
|
||||
import akka.pattern.ask
|
||||
import org.apache.commons.codec.binary.Hex.{ encodeHex, decodeHex }
|
||||
|
||||
object SerializeSpec {
|
||||
object SerializationTests {
|
||||
|
||||
val config = """
|
||||
val serializeConf = """
|
||||
akka {
|
||||
actor {
|
||||
serializers {
|
||||
|
|
@ -26,13 +29,13 @@ object SerializeSpec {
|
|||
}
|
||||
|
||||
serialization-bindings {
|
||||
"akka.serialization.SerializeSpec$Person" = java
|
||||
"akka.serialization.SerializeSpec$Address" = java
|
||||
"akka.serialization.TestSerializble" = test
|
||||
"akka.serialization.SerializeSpec$PlainMessage" = test
|
||||
"akka.serialization.SerializeSpec$A" = java
|
||||
"akka.serialization.SerializeSpec$B" = test
|
||||
"akka.serialization.SerializeSpec$D" = test
|
||||
"akka.serialization.SerializationTests$Person" = java
|
||||
"akka.serialization.SerializationTests$Address" = java
|
||||
"akka.serialization.TestSerializable" = test
|
||||
"akka.serialization.SerializationTests$PlainMessage" = test
|
||||
"akka.serialization.SerializationTests$A" = java
|
||||
"akka.serialization.SerializationTests$B" = test
|
||||
"akka.serialization.SerializationTests$D" = test
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -45,11 +48,11 @@ object SerializeSpec {
|
|||
|
||||
case class Record(id: Int, person: Person)
|
||||
|
||||
class SimpleMessage(s: String) extends TestSerializble
|
||||
class SimpleMessage(s: String) extends TestSerializable
|
||||
|
||||
class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s)
|
||||
|
||||
trait AnotherInterface extends TestSerializble
|
||||
trait AnotherInterface extends TestSerializable
|
||||
|
||||
class AnotherMessage extends AnotherInterface
|
||||
|
||||
|
|
@ -67,11 +70,67 @@ object SerializeSpec {
|
|||
class D extends A
|
||||
class E extends D
|
||||
|
||||
val verifySerializabilityConf = """
|
||||
akka {
|
||||
actor {
|
||||
serialize-messages = on
|
||||
serialize-creators = on
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
class FooActor extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
}
|
||||
}
|
||||
|
||||
class FooUntypedActor extends UntypedActor {
|
||||
def onReceive(message: Any) {}
|
||||
}
|
||||
|
||||
class NonSerializableActor(system: ActorSystem) extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
}
|
||||
}
|
||||
|
||||
def mostlyReferenceSystem: ActorSystem = {
|
||||
val referenceConf = ConfigFactory.defaultReference()
|
||||
val mostlyReferenceConf = AkkaSpec.testConf.withFallback(referenceConf)
|
||||
ActorSystem("SerializationSystem", mostlyReferenceConf)
|
||||
}
|
||||
|
||||
val systemMessageMultiSerializerConf = """
|
||||
akka {
|
||||
actor {
|
||||
serializers {
|
||||
test = "akka.serialization.TestSerializer"
|
||||
}
|
||||
|
||||
serialization-bindings {
|
||||
"akka.dispatch.SystemMessage" = test
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
val systemMessageClasses = List[Class[_]](
|
||||
classOf[Create],
|
||||
classOf[Recreate],
|
||||
classOf[Suspend],
|
||||
classOf[Resume],
|
||||
classOf[Terminate],
|
||||
classOf[Supervise],
|
||||
classOf[ChildTerminated],
|
||||
classOf[Watch],
|
||||
classOf[Unwatch],
|
||||
NoMessage.getClass)
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
||||
import SerializeSpec._
|
||||
class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
|
||||
import SerializationTests._
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
import ser._
|
||||
|
|
@ -156,7 +215,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
|
||||
"give warning for message with several bindings" in {
|
||||
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
|
||||
ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer])
|
||||
ser.serializerFor(classOf[Both]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -164,7 +223,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer])
|
||||
ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer])
|
||||
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
|
||||
ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer])
|
||||
ser.serializerFor(classOf[C]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -194,36 +253,9 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
}
|
||||
}
|
||||
|
||||
object VerifySerializabilitySpec {
|
||||
val conf = """
|
||||
akka {
|
||||
actor {
|
||||
serialize-messages = on
|
||||
serialize-creators = on
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
class FooActor extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
}
|
||||
}
|
||||
|
||||
class FooUntypedActor extends UntypedActor {
|
||||
def onReceive(message: Any) {}
|
||||
}
|
||||
|
||||
class NonSerializableActor(system: ActorSystem) extends Actor {
|
||||
def receive = {
|
||||
case s: String ⇒ sender ! s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) {
|
||||
import VerifySerializabilitySpec._
|
||||
class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerializabilityConf) {
|
||||
import SerializationTests._
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
|
||||
"verify config" in {
|
||||
|
|
@ -260,7 +292,85 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
|
|||
}
|
||||
}
|
||||
|
||||
trait TestSerializble
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ReferenceSerializationSpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
|
||||
import SerializationTests._
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
def serializerMustBe(toSerialize: Class[_], expectedSerializer: Class[_]) =
|
||||
ser.serializerFor(toSerialize).getClass must be(expectedSerializer)
|
||||
|
||||
"Serialization settings from reference.conf" must {
|
||||
|
||||
"declare Serializable classes to be use JavaSerializer" in {
|
||||
serializerMustBe(classOf[Serializable], classOf[JavaSerializer])
|
||||
serializerMustBe(classOf[String], classOf[JavaSerializer])
|
||||
for (smc ← systemMessageClasses) {
|
||||
serializerMustBe(smc, classOf[JavaSerializer])
|
||||
}
|
||||
}
|
||||
|
||||
"declare Array[Byte] to use ByteArraySerializer" in {
|
||||
serializerMustBe(classOf[Array[Byte]], classOf[ByteArraySerializer])
|
||||
}
|
||||
|
||||
"not support serialization for other classes" in {
|
||||
intercept[NotSerializableException] { ser.serializerFor(classOf[Object]) }
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
|
||||
import SerializationTests._
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
|
||||
"Cross-version serialization compatibility" must {
|
||||
|
||||
"be preserved for SystemMessages" in {
|
||||
val objs = List[(String, Any)](
|
||||
("akka.dispatch.Create", Create(1234)),
|
||||
("akka.dispatch.Recreate", Recreate(FakeThrowable("x"))),
|
||||
("akka.dispatch.Suspend", Suspend()),
|
||||
("akka.dispatch.Resume", Resume(FakeThrowable("x"))),
|
||||
("akka.dispatch.Terminate", Terminate()),
|
||||
("akka.dispatch.Supervise", Supervise(FakeActorRef("child"), true, 2468)),
|
||||
("akka.dispatch.ChildTerminated", ChildTerminated(FakeActorRef("child"))),
|
||||
("akka.dispatch.Watch", Watch(FakeActorRef("watchee"), FakeActorRef("watcher"))),
|
||||
("akka.dispatch.Unwatch", Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher"))),
|
||||
("akka.dispatch.NoMessage", NoMessage))
|
||||
val expectedConf = ConfigFactory.load("akka/serialization/serialized.conf")
|
||||
for ((key, obj) ← objs) {
|
||||
val hex = new String(encodeHex(ser.serialize(obj, obj.getClass).get))
|
||||
hex must be(expectedConf.getString(key))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class OverriddenSystemMessageSerializationSpec extends AkkaSpec(SerializationTests.systemMessageMultiSerializerConf) {
|
||||
import SerializationTests._
|
||||
|
||||
val ser = SerializationExtension(system)
|
||||
|
||||
"Overridden SystemMessage serialization" must {
|
||||
|
||||
"resolve to a single serializer" in {
|
||||
EventFilter.warning(start = "Multiple serializers found", occurrences = 0) intercept {
|
||||
for (smc ← systemMessageClasses) {
|
||||
ser.serializerFor(smc).getClass must be(classOf[TestSerializer])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
trait TestSerializable
|
||||
|
||||
class TestSerializer extends Serializer {
|
||||
def includeManifest: Boolean = false
|
||||
|
|
@ -273,3 +383,26 @@ class TestSerializer extends Serializer {
|
|||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
|
||||
}
|
||||
|
||||
@SerialVersionUID(1)
|
||||
case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
|
||||
override def fillInStackTrace = null
|
||||
}
|
||||
|
||||
@SerialVersionUID(1)
|
||||
case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope {
|
||||
override def path = RootActorPath(Address("proto", "SomeSystem"), name)
|
||||
override def forward(message: Any)(implicit context: ActorContext) = ???
|
||||
override def isTerminated = ???
|
||||
override def start() = ???
|
||||
override def resume(causedByFailure: Throwable) = ???
|
||||
override def suspend() = ???
|
||||
override def restart(cause: Throwable) = ???
|
||||
override def stop() = ???
|
||||
override def sendSystemMessage(message: SystemMessage) = ???
|
||||
override def provider = ???
|
||||
override def getParent = ???
|
||||
override def getChild(name: Iterator[String]) = ???
|
||||
override def isLocal = ???
|
||||
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = ???
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue