diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 0ed0f557eb..1577b62e13 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -8,6 +8,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteAc import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.dispatch.MessageInvocation import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ @@ -74,19 +75,19 @@ object ActorSerialization { def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = fromBinaryToLocalActorRef(bytes, format) - def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = - toSerializedActorRefProtocol(a, format).toByteArray + def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] = + toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray // wrapper for implicits to be used by Java def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = fromBinary(bytes)(format) // wrapper for implicits to be used by Java - def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] = - toBinary(a)(format) + def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = + toBinary(a, srlMailBox)(format) private def toSerializedActorRefProtocol[T <: Actor]( - actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = { + actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) @@ -114,6 +115,29 @@ object ActorSerialization { .setIsTransactor(actorRef.isTransactor) .setTimeout(actorRef.timeout) + if (serializeMailBox == true) { + val messages = + actorRef.mailbox match { + case q: java.util.Queue[MessageInvocation] => + val l = new scala.collection.mutable.ListBuffer[MessageInvocation] + val it = q.iterator + while (it.hasNext == true) l += it.next + l + } + + val requestProtocols = + messages.map(m => + RemoteActorSerialization.createRemoteRequestProtocolBuilder( + actorRef, + m.message, + false, + actorRef.getSender, + None, + ActorType.ScalaActor).build) + + requestProtocols.foreach(rp => builder.addMessages(rp)) + } + actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 7e8babe168..832a655c22 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -127,9 +127,16 @@ class SerializableTypeClassActorSpec extends (actor1 ! "hello") (actor1 ! "hello") (actor1 ! "hello") + actor1.mailboxSize should be > (0) val actor2 = fromBinary(toBinary(actor1)) Thread.sleep(1000) + actor2.mailboxSize should be > (0) (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") } } } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala new file mode 100644 index 0000000000..ed175ea0ad --- /dev/null +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -0,0 +1,126 @@ +package se.scalablesolutions.akka.actor.serialization + + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization._ +import se.scalablesolutions.akka.actor._ +import ActorSerialization._ +import Actor._ + +@RunWith(classOf[JUnitRunner]) +class Ticket435Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + object BinaryFormatMyStatefulActor { + implicit object MyStatefulActorFormat extends Format[MyStatefulActor] { + def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + act.count = p.getCount + act + } + def toBinary(ac: MyStatefulActor) = + ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray + } + } + + object BinaryFormatMyStatelessActorWithMessagesInMailbox { + implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox] + } + + describe("Serializable actor") { + + it("should be able to serialize and deserialize a stateless actor with messages in mailbox") { + import BinaryFormatMyStatelessActorWithMessagesInMailbox._ + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") + } + + it("should serialize the mailbox optionally") { + import BinaryFormatMyStatelessActorWithMessagesInMailbox._ + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + actor1.mailboxSize should be > (0) + + val actor2 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor2.mailboxSize should equal(0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + } + + it("should be able to serialize and deserialize a stateful actor with messages in mailbox") { + import BinaryFormatMyStatefulActor._ + + val actor1 = actorOf[MyStatefulActor].start + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello").getOrElse("_") should equal("world 1") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello").getOrElse("_") should equal("world 1") + } + } +} + +class MyStatefulActor extends Actor { + var count = 0 + + def receive = { + case "hi" => + println("# messages in mailbox " + self.mailboxSize) + Thread.sleep(500) + case "hello" => + count = count + 1 + self.reply("world " + count) + } +}