2011-06-07 06:36:21 +05:30
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-06-07 06:36:21 +05:30
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.serialization
|
|
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
import akka.testkit.AkkaSpec
|
2011-11-22 13:04:10 +01:00
|
|
|
import com.typesafe.config.ConfigFactory
|
2011-12-08 16:07:03 +01:00
|
|
|
import akka.actor._
|
|
|
|
|
import java.io._
|
2011-12-27 17:30:05 +01:00
|
|
|
import akka.dispatch.Await
|
|
|
|
|
import akka.util.Timeout
|
|
|
|
|
import akka.util.duration._
|
2011-12-21 11:25:40 +01:00
|
|
|
import scala.reflect.BeanInfo
|
|
|
|
|
import com.google.protobuf.Message
|
2012-01-18 10:18:51 +01:00
|
|
|
import akka.pattern.ask
|
2011-12-21 11:25:40 +01:00
|
|
|
|
|
|
|
|
class ProtobufSerializer extends Serializer {
|
|
|
|
|
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
2011-12-29 16:17:19 +01:00
|
|
|
def includeManifest: Boolean = true
|
2011-12-30 22:00:49 +01:00
|
|
|
def identifier = 2
|
2011-12-21 11:25:40 +01:00
|
|
|
|
|
|
|
|
def toBinary(obj: AnyRef): Array[Byte] = {
|
|
|
|
|
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
|
|
|
|
|
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
|
|
|
|
|
obj.asInstanceOf[Message].toByteArray
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
|
|
|
|
|
if (!clazz.isDefined) throw new IllegalArgumentException(
|
|
|
|
|
"Need a protobuf message class to be able to serialize bytes using protobuf")
|
|
|
|
|
clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-06-07 06:36:21 +05:30
|
|
|
|
|
|
|
|
object SerializeSpec {
|
2011-11-22 13:04:10 +01:00
|
|
|
|
|
|
|
|
val serializationConf = ConfigFactory.parseString("""
|
|
|
|
|
akka {
|
|
|
|
|
actor {
|
|
|
|
|
serializers {
|
|
|
|
|
java = "akka.serialization.JavaSerializer"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
serialization-bindings {
|
2011-12-21 11:25:40 +01:00
|
|
|
java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
|
2011-11-22 13:04:10 +01:00
|
|
|
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-11-29 11:50:22 +01:00
|
|
|
""")
|
2011-11-22 13:04:10 +01:00
|
|
|
|
2011-06-07 06:36:21 +05:30
|
|
|
@BeanInfo
|
|
|
|
|
case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") }
|
|
|
|
|
@BeanInfo
|
|
|
|
|
case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) }
|
|
|
|
|
|
|
|
|
|
case class Record(id: Int, person: Person)
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2011-11-22 13:04:10 +01:00
|
|
|
class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
|
2011-06-07 06:36:21 +05:30
|
|
|
import SerializeSpec._
|
|
|
|
|
|
2011-11-24 18:53:18 +01:00
|
|
|
val ser = SerializationExtension(system)
|
2011-11-16 17:18:36 +01:00
|
|
|
import ser._
|
2011-06-07 06:36:21 +05:30
|
|
|
|
2011-11-22 13:04:10 +01:00
|
|
|
val addr = Address("120", "Monroe Street", "Santa Clara", "95050")
|
|
|
|
|
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
|
|
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
"Serialization" must {
|
|
|
|
|
|
2011-11-22 13:04:10 +01:00
|
|
|
"have correct bindings" in {
|
|
|
|
|
ser.bindings(addr.getClass.getName) must be("java")
|
2011-12-21 11:25:40 +01:00
|
|
|
ser.bindings("akka.actor.ProtobufProtocol$MyMessage") must be("proto")
|
2011-11-22 13:04:10 +01:00
|
|
|
}
|
|
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
"serialize Address" in {
|
|
|
|
|
val b = serialize(addr) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(bytes) ⇒ bytes
|
|
|
|
|
}
|
|
|
|
|
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address], None) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(add) ⇒ assert(add === addr)
|
|
|
|
|
}
|
2011-06-07 06:36:21 +05:30
|
|
|
}
|
|
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
"serialize Person" in {
|
2011-11-22 13:04:10 +01:00
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
val b = serialize(person) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(bytes) ⇒ bytes
|
|
|
|
|
}
|
|
|
|
|
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person], None) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(p) ⇒ assert(p === person)
|
|
|
|
|
}
|
2011-06-07 06:36:21 +05:30
|
|
|
}
|
2011-10-11 16:05:48 +02:00
|
|
|
|
|
|
|
|
"serialize record with default serializer" in {
|
2011-11-22 13:04:10 +01:00
|
|
|
|
2011-10-11 16:05:48 +02:00
|
|
|
val r = Record(100, person)
|
|
|
|
|
val b = serialize(r) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(bytes) ⇒ bytes
|
|
|
|
|
}
|
|
|
|
|
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record], None) match {
|
|
|
|
|
case Left(exception) ⇒ fail(exception)
|
|
|
|
|
case Right(p) ⇒ assert(p === r)
|
|
|
|
|
}
|
2011-06-07 06:36:21 +05:30
|
|
|
}
|
2011-11-01 11:20:02 +01:00
|
|
|
|
2011-12-08 16:07:03 +01:00
|
|
|
"not serialize ActorCell" in {
|
|
|
|
|
val a = system.actorOf(Props(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case o: ObjectOutputStream ⇒
|
|
|
|
|
try {
|
|
|
|
|
o.writeObject(this)
|
|
|
|
|
} catch {
|
|
|
|
|
case _: NotSerializableException ⇒ testActor ! "pass"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
a ! new ObjectOutputStream(new ByteArrayOutputStream())
|
|
|
|
|
expectMsg("pass")
|
2011-12-14 00:06:36 +01:00
|
|
|
system.stop(a)
|
2011-12-08 16:07:03 +01:00
|
|
|
}
|
|
|
|
|
|
2011-11-01 11:20:02 +01:00
|
|
|
"serialize DeadLetterActorRef" in {
|
|
|
|
|
val outbuf = new ByteArrayOutputStream()
|
|
|
|
|
val out = new ObjectOutputStream(outbuf)
|
2011-11-30 15:16:20 +01:00
|
|
|
val a = ActorSystem("SerializeDeadLeterActorRef", AkkaSpec.testConf)
|
|
|
|
|
try {
|
|
|
|
|
out.writeObject(a.deadLetters)
|
|
|
|
|
out.flush()
|
|
|
|
|
out.close()
|
|
|
|
|
|
|
|
|
|
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
|
|
|
|
|
Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
|
|
|
|
|
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
|
|
|
|
|
(deadLetters eq a.deadLetters) must be(true)
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
2011-12-14 01:06:20 +01:00
|
|
|
a.shutdown()
|
2011-11-01 11:20:02 +01:00
|
|
|
}
|
|
|
|
|
}
|
2011-06-07 06:36:21 +05:30
|
|
|
}
|
|
|
|
|
}
|
2011-12-27 17:30:05 +01:00
|
|
|
|
|
|
|
|
object VerifySerializabilitySpec {
|
|
|
|
|
val conf = ConfigFactory.parseString("""
|
|
|
|
|
akka {
|
|
|
|
|
actor {
|
|
|
|
|
serialize-messages = on
|
|
|
|
|
|
|
|
|
|
serialize-creators = on
|
|
|
|
|
|
|
|
|
|
serializers {
|
|
|
|
|
java = "akka.serialization.JavaSerializer"
|
2011-12-30 19:13:09 +01:00
|
|
|
proto = "akka.serialization.ProtobufSerializer"
|
2011-12-27 17:30:05 +01:00
|
|
|
default = "akka.serialization.JavaSerializer"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
serialization-bindings {
|
|
|
|
|
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
|
|
|
|
|
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
class FooActor extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case s: String ⇒ sender ! s
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class NonSerializableActor(system: ActorSystem) extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case s: String ⇒ sender ! s
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) {
|
|
|
|
|
import VerifySerializabilitySpec._
|
2011-12-27 20:07:21 +01:00
|
|
|
implicit val timeout = Timeout(5 seconds)
|
2011-12-27 17:30:05 +01:00
|
|
|
|
2011-12-27 20:07:21 +01:00
|
|
|
"verify config" in {
|
2011-12-27 17:30:05 +01:00
|
|
|
system.settings.SerializeAllCreators must be(true)
|
|
|
|
|
system.settings.SerializeAllMessages must be(true)
|
2011-12-27 20:07:21 +01:00
|
|
|
}
|
2011-12-27 17:30:05 +01:00
|
|
|
|
2011-12-27 20:07:21 +01:00
|
|
|
"verify creators" in {
|
2011-12-27 17:30:05 +01:00
|
|
|
val a = system.actorOf(Props[FooActor])
|
|
|
|
|
intercept[NotSerializableException] {
|
|
|
|
|
Await.result(a ? new AnyRef, timeout.duration)
|
|
|
|
|
}
|
2011-12-27 20:07:21 +01:00
|
|
|
system stop a
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"verify messages" in {
|
|
|
|
|
val a = system.actorOf(Props[FooActor])
|
|
|
|
|
Await.result(a ? "pigdog", timeout.duration) must be("pigdog")
|
2011-12-27 17:30:05 +01:00
|
|
|
intercept[java.io.NotSerializableException] {
|
|
|
|
|
val b = system.actorOf(Props(new NonSerializableActor(system)))
|
|
|
|
|
}
|
2011-12-27 20:07:21 +01:00
|
|
|
system stop a
|
2011-12-27 17:30:05 +01:00
|
|
|
}
|
|
|
|
|
}
|