fixed ticket #435. Also made serialization of mailbox optional - default true

This commit is contained in:
Debasish Ghosh 2010-09-18 17:21:12 +05:30
parent 14b371b8f7
commit a39ce105e0
3 changed files with 162 additions and 5 deletions

View file

@ -8,6 +8,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteAc
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.dispatch.MessageInvocation
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import ActorTypeProtocol._
@ -74,19 +75,19 @@ object ActorSerialization {
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
fromBinaryToLocalActorRef(bytes, format)
def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format).toByteArray
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
fromBinary(bytes)(format)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] =
toBinary(a)(format)
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
private def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
@ -114,6 +115,29 @@ object ActorSerialization {
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
if (serializeMailBox == true) {
val messages =
actorRef.mailbox match {
case q: java.util.Queue[MessageInvocation] =>
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val it = q.iterator
while (it.hasNext == true) l += it.next
l
}
val requestProtocols =
messages.map(m =>
RemoteActorSerialization.createRemoteRequestProtocolBuilder(
actorRef,
m.message,
false,
actorRef.getSender,
None,
ActorType.ScalaActor).build)
requestProtocols.foreach(rp => builder.addMessages(rp))
}
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))

View file

@ -127,9 +127,16 @@ class SerializableTypeClassActorSpec extends
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
}
}

View file

@ -0,0 +1,126 @@
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization._
import se.scalablesolutions.akka.actor._
import ActorSerialization._
import Actor._
@RunWith(classOf[JUnitRunner])
class Ticket435Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
object BinaryFormatMyStatefulActor {
implicit object MyStatefulActorFormat extends Format[MyStatefulActor] {
def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount
act
}
def toBinary(ac: MyStatefulActor) =
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
}
}
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
}
describe("Serializable actor") {
it("should be able to serialize and deserialize a stateless actor with messages in mailbox") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should serialize the mailbox optionally") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor2.mailboxSize should equal(0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should be able to serialize and deserialize a stateful actor with messages in mailbox") {
import BinaryFormatMyStatefulActor._
val actor1 = actorOf[MyStatefulActor].start
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello").getOrElse("_") should equal("world 1")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello").getOrElse("_") should equal("world 1")
}
}
}
class MyStatefulActor extends Actor {
var count = 0
def receive = {
case "hi" =>
println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500)
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}