From 3aed09c43d5bb9c578b3543ced2a3c18fdc2fb10 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 1 Nov 2011 11:20:02 +0100 Subject: [PATCH] #1320 - Implementing readResolve and writeReplace for the DeadLetterActorRef --- .../akka/serialization/SerializeSpec.scala | 18 ++++++++++++++ .../src/main/scala/akka/actor/ActorRef.scala | 24 +++++++++++++++---- .../scala/akka/actor/ActorRefProvider.scala | 1 + 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 72cac70ff9..69c39a99d8 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -7,6 +7,9 @@ package akka.serialization import akka.serialization.Serialization._ import scala.reflect._ import akka.testkit.AkkaSpec +import akka.AkkaApplication +import java.io.{ ObjectInputStream, ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream } +import akka.actor.DeadLetterActorRef object SerializeSpec { @BeanInfo @@ -61,5 +64,20 @@ class SerializeSpec extends AkkaSpec { case Right(p) ⇒ assert(p === r) } } + + "serialize DeadLetterActorRef" in { + val outbuf = new ByteArrayOutputStream() + val out = new ObjectOutputStream(outbuf) + val a = new AkkaApplication() + out.writeObject(a.deadLetters) + out.flush() + out.close() + + val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) + Serialization.app.withValue(a) { + val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] + deadLetters must be(a.deadLetters) + } + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bbee6ef5ea..4ea551cce1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -11,6 +11,7 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.AkkaApplication import akka.event.ActorEventBus import akka.serialization.Serialization +import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef /** * ActorRef is an immutable and serializable handle to an Actor. @@ -350,6 +351,15 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { case class DeadLetter(message: Any, sender: ActorRef) +object DeadLetterActorRef { + class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? + @throws(classOf[java.io.ObjectStreamException]) + private def readResolve(): AnyRef = Serialization.app.value.deadLetters + } + + val serialized = new SerializedDeadLetterActorRef +} + class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) override val address: String = "akka:internal:DeadLetterActorRef" @@ -358,7 +368,13 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = app.eventHandler.notify(DeadLetter(message, sender)) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = brokenPromise + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + app.eventHandler.notify(DeadLetter(message, this)) + brokenPromise + } + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = DeadLetterActorRef.serialized } abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef { @@ -373,9 +389,9 @@ abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout protected def whenDone(): Unit protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = message match { - case akka.actor.Status.Success(r) ⇒ result.completeWithResult(r) - case akka.actor.Status.Failure(f) ⇒ result.completeWithException(f) - case other ⇒ result.completeWithResult(other) + case Status.Success(r) ⇒ result.completeWithResult(r) + case Status.Failure(f) ⇒ result.completeWithException(f) + case other ⇒ result.completeWithResult(other) } protected[akka] override def sendSystemMessage(message: SystemMessage): Unit = message match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8881d459ed..0d72f9fd09 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -30,6 +30,7 @@ trait ActorRefProvider { private[akka] def evict(address: String): Boolean private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] + private[akka] def serialize(actor: ActorRef): AnyRef private[akka] def createDeathWatch(): DeathWatch