Making the MethodCall:s serializable so that they can be stored in durable mailboxes, and can be sent to remote nodes

This commit is contained in:
Viktor Klang 2011-05-18 17:10:22 +02:00
parent 404118c4d3
commit 07acfb49ee

View file

@ -3,10 +3,10 @@ package akka.thaipedactor
import akka.japi.{Creator, Option => JOption}
import akka.actor.Actor.{actorOf, futureToAnyOptionAsTypedOption}
import akka.transactor.annotation.{Coordinated => CoordinatedAnnotation}
import akka.util.Duration
import akka.dispatch.{MessageDispatcher, Dispatchers, AlreadyCompletedFuture, Future}
import akka.actor.{ActorRef, Actor}
import java.lang.reflect.{InvocationTargetException, Method, InvocationHandler, Proxy}
import akka.util.{Duration}
import akka.actor.{ActorRef, Actor}
object MethodCall {
def isOneWay(method: Method): Boolean = method.getReturnType == java.lang.Void.TYPE
@ -35,6 +35,13 @@ case class MethodCall(method: Method, parameters: Array[AnyRef]) {
} catch {
case i: InvocationTargetException => throw i.getTargetException
}
private def writeReplace(): AnyRef =
new SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, parameters)
}
case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], parameterValues: Array[AnyRef]) {
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes:_*), parameterValues)
}
object ThaipedActor {
@ -94,17 +101,15 @@ object ThaipedActor {
case class Configuration(timeout: Duration = defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
def thaipedActorOf[T <: AnyRef,TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T =
newThaipedActor(interface, impl.newInstance.asInstanceOf[TI], config, loader)
configureAndProxy(interface, actorOf(new ThaipedActor[TI](impl.newInstance.asInstanceOf[TI])), config, loader)
def thaipedActorOf[T <: AnyRef,TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T =
newThaipedActor(interface, impl.create, config, loader)
configureAndProxy(interface, actorOf(new ThaipedActor[TI](impl.create)), config, loader)
def thaipedActorOf[T <: AnyRef : Manifest, TI <: T](impl: Creator[TI], config: Configuration, loader: ClassLoader): T =
newThaipedActor[T,TI](implicitly[Manifest[T]].erasure.asInstanceOf[Class[T]], impl.create, config, loader)
protected def newThaipedActor[T <: AnyRef, TI <: T](interface: Class[T], impl: => TI, config: Configuration, loader: ClassLoader): T = {
val actor = actorOf(new ThaipedActor[TI](impl))
configureAndProxy(implicitly[Manifest[T]].erasure.asInstanceOf[Class[T]], actorOf(new ThaipedActor[TI](impl.create)), config, loader)
protected def configureAndProxy[T <: AnyRef](interface: Class[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
actor.timeout = config.timeout.toMillis
actor.dispatcher = config.dispatcher