#1320 - Implementing readResolve and writeReplace for the DeadLetterActorRef

This commit is contained in:
Viktor Klang 2011-11-01 11:20:02 +01:00
parent 6e5de8bb23
commit 3aed09c43d
3 changed files with 39 additions and 4 deletions

View file

@ -7,6 +7,9 @@ package akka.serialization
import akka.serialization.Serialization._
import scala.reflect._
import akka.testkit.AkkaSpec
import akka.AkkaApplication
import java.io.{ ObjectInputStream, ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream }
import akka.actor.DeadLetterActorRef
object SerializeSpec {
@BeanInfo
@ -61,5 +64,20 @@ class SerializeSpec extends AkkaSpec {
case Right(p) assert(p === r)
}
}
"serialize DeadLetterActorRef" in {
val outbuf = new ByteArrayOutputStream()
val out = new ObjectOutputStream(outbuf)
val a = new AkkaApplication()
out.writeObject(a.deadLetters)
out.flush()
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
Serialization.app.withValue(a) {
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
deadLetters must be(a.deadLetters)
}
}
}
}

View file

@ -11,6 +11,7 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.AkkaApplication
import akka.event.ActorEventBus
import akka.serialization.Serialization
import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -350,6 +351,15 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef {
case class DeadLetter(message: Any, sender: ActorRef)
object DeadLetterActorRef {
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = Serialization.app.value.deadLetters
}
val serialized = new SerializedDeadLetterActorRef
}
class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher)
override val address: String = "akka:internal:DeadLetterActorRef"
@ -358,7 +368,13 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef {
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = app.eventHandler.notify(DeadLetter(message, sender))
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = brokenPromise
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
app.eventHandler.notify(DeadLetter(message, this))
brokenPromise
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout = app.AkkaConfig.ActorTimeout, dispatcher: MessageDispatcher = app.dispatcher) extends MinimalActorRef {
@ -373,9 +389,9 @@ abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout
protected def whenDone(): Unit
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = message match {
case akka.actor.Status.Success(r) result.completeWithResult(r)
case akka.actor.Status.Failure(f) result.completeWithException(f)
case other result.completeWithResult(other)
case Status.Success(r) result.completeWithResult(r)
case Status.Failure(f) result.completeWithException(f)
case other result.completeWithResult(other)
}
protected[akka] override def sendSystemMessage(message: SystemMessage): Unit = message match {

View file

@ -30,6 +30,7 @@ trait ActorRefProvider {
private[akka] def evict(address: String): Boolean
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
private[akka] def serialize(actor: ActorRef): AnyRef
private[akka] def createDeathWatch(): DeathWatch