Bye-bye ReflectiveAccess, introducing PropertyMaster, see #1750

- PropertyMaster is the only place in Akka which calls
  ClassLoader.getClass (apart from kernel, which might be special)
- all PropertyMaster methods (there are only three) take a ClassManifest
  of what is to be constructed, and they verify that the obtained object
  is actually compatible with the required type

Other stuff:
- noticed that I had forgotten to change to ExtendedActorSystem when
  constructing Extensions by ExtensionKey (damn you, reflection!)
- moved Serializer.currentSystem into JavaSerializer, because that’s the
  only one needing it (it’s only used in readResolve() methods)
- Serializers are constructed now with one-arg constructor taking
  ExtendedActorSystem (if that exists, otherwise no-arg as before), to
  allow JavaSerializer to do its magic; possibly necessary for others as
  well
- Removed all Option[ClassLoader] signatures
- made it so that the ActorSystem will try context class loader, then
  the class loader which loaded the class actually calling into
  ActorSystem.apply, then the loader which loaded ActorSystemImpl
- for the second of the above I added a (reflectively accessed hopefully
  safe) facility for getting caller Class[_] objects by using
  sun.reflect.Reflection; this is optional an defaults to None, e.g. on
  Android, which means that getting the caller’s classloader is done on
  a best effort basis (there’s nothing we can do because a StackTrace
  does not contain actual Class[_] objects).
- refactored DurableMailbox to contain the owner val and use that
  instead of declaring that in all subclasses
This commit is contained in:
Roland 2012-02-09 11:56:43 +01:00
parent 15fa414d46
commit 2ce47d6bb5
35 changed files with 322 additions and 300 deletions

View file

@ -40,9 +40,9 @@ public class JavaExtension {
static final ExtensionKey<OtherExtension> key = new ExtensionKey<OtherExtension>(OtherExtension.class) {
};
public final ActorSystemImpl system;
public final ExtendedActorSystem system;
public OtherExtension(ActorSystemImpl i) {
public OtherExtension(ExtendedActorSystem i) {
system = i;
}
}

View file

@ -11,11 +11,10 @@ import akka.testkit._
import akka.util.Timeout
import akka.util.duration._
import java.lang.IllegalStateException
import akka.util.ReflectiveAccess
import akka.serialization.Serialization
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
import akka.pattern.ask
import akka.serialization.JavaSerializer
object ActorRefSpec {
@ -240,6 +239,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
"be serializable using Java Serialization on local node" in {
val a = system.actorOf(Props[InnerActor])
val esys = system.asInstanceOf[ExtendedActorSystem]
import java.io._
@ -251,14 +251,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
out.flush
out.close
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
val bytes = baos.toByteArray
JavaSerializer.currentSystem.withValue(esys) {
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
val readA = in.readObject
a.isInstanceOf[LocalActorRef] must be === true
readA.isInstanceOf[LocalActorRef] must be === true
(readA eq a) must be === true
}
val ser = new JavaSerializer(esys)
val readA = ser.fromBinary(bytes, None)
readA.isInstanceOf[LocalActorRef] must be === true
(readA eq a) must be === true
}
"throw an exception on deserialize if no system in scope" in {
@ -297,7 +304,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
out.flush
out.close
Serialization.currentSystem.withValue(sysImpl) {
JavaSerializer.currentSystem.withValue(sysImpl) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream)
}

View file

@ -8,7 +8,6 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
import akka.serialization.Serialization
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
@ -19,6 +18,7 @@ import akka.japi.{ Creator, Option ⇒ JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
import akka.pattern.ask
import akka.serialization.JavaSerializer
object TypedActorSpec {
@ -367,7 +367,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
"be able to serialize and deserialize invocations" in {
import java.io._
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
@ -386,7 +386,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
"be able to serialize and deserialize invocations' parameters" in {
import java.io._
val someFoo: Foo = new Bar
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)

View file

@ -78,7 +78,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address], None) match {
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match {
case Left(exception) fail(exception)
case Right(add) assert(add === addr)
}
@ -90,7 +90,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person], None) match {
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match {
case Left(exception) fail(exception)
case Right(p) assert(p === person)
}
@ -103,7 +103,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record], None) match {
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match {
case Left(exception) fail(exception)
case Right(p) assert(p === r)
}
@ -135,7 +135,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
JavaSerializer.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
(deadLetters eq a.deadLetters) must be(true)
}
@ -248,8 +248,5 @@ class TestSerializer extends Serializer {
Array.empty[Byte]
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None,
classLoader: Option[ClassLoader] = None): AnyRef = {
null
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
}

View file

@ -220,7 +220,7 @@ private[akka] class ActorCell(
val ser = SerializationExtension(system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass, None) match {
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}

View file

@ -334,7 +334,7 @@ private[akka] class LocalActorRef private[akka] (
* Memento pattern for serializing ActorRefs transparently
*/
case class SerializedActorRef private (path: String) {
import akka.serialization.Serialization.currentSystem
import akka.serialization.JavaSerializer.currentSystem
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = currentSystem.value match {
@ -399,7 +399,7 @@ case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
private[akka] object DeadLetterActorRef {
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters
private def readResolve(): AnyRef = akka.serialization.JavaSerializer.currentSystem.value.deadLetters
}
val serialized = new SerializedDeadLetterActorRef

View file

@ -318,12 +318,12 @@ class LocalActorRefProvider(
settings: ActorSystem.Settings,
eventStream: EventStream,
scheduler: Scheduler,
classloader: ClassLoader) =
propertyMaster: PropertyMaster) =
this(_systemName,
settings,
eventStream,
scheduler,
new Deployer(settings, classloader))
new Deployer(settings, propertyMaster))
val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))

View file

@ -324,13 +324,13 @@ abstract class ExtendedActorSystem extends ActorSystem {
def deathWatch: DeathWatch
/**
* ClassLoader which is used for reflective accesses internally. This is set
* to the context class loader, if one is set, or the class loader which
* ClassLoader wrapper which is used for reflective accesses internally. This is set
* to use the context class loader, if one is set, or the class loader which
* loaded the ActorSystem implementation. The context class loader is also
* set on all threads created by the ActorSystem, if one was set during
* creation.
*/
def internalClassLoader: ClassLoader
def propertyMaster: PropertyMaster
}
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem {
@ -356,6 +356,34 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
final val threadFactory: MonitorableThreadFactory =
MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader), uncaughtExceptionHandler)
/**
* This is an extension point: by overriding this method, subclasses can
* control all reflection activities of an actor system.
*/
protected def createPropertyMaster(): PropertyMaster = new DefaultPropertyMaster(findClassLoader)
protected def findClassLoader: ClassLoader =
Option(Thread.currentThread.getContextClassLoader) orElse
(Reflect.getCallerClass map findCaller) getOrElse
getClass.getClassLoader
private def findCaller(get: Int Class[_]): ClassLoader = {
val frames = Iterator.from(2).map(get)
frames dropWhile { c
c != null &&
(c.getName.startsWith("akka.actor.ActorSystem") ||
c.getName.startsWith("scala.Option") ||
c.getName.startsWith("scala.collection.Iterator") ||
c.getName.startsWith("akka.util.Reflect"))
} next () match {
case null getClass.getClassLoader
case c c.getClassLoader
}
}
private val _pm: PropertyMaster = createPropertyMaster()
def propertyMaster: PropertyMaster = _pm
def logConfiguration(): Unit = log.info(settings.toString)
protected def systemImpl = this
@ -406,17 +434,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
val scheduler: Scheduler = createScheduler()
val internalClassLoader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader
val provider: ActorRefProvider = {
val arguments = Seq(
classOf[String] -> name,
classOf[Settings] -> settings,
classOf[EventStream] -> eventStream,
classOf[Scheduler] -> scheduler,
classOf[ClassLoader] -> internalClassLoader)
classOf[PropertyMaster] -> propertyMaster)
ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, internalClassLoader) match {
propertyMaster.getInstanceFor[ActorRefProvider](ProviderClass, arguments) match {
case Left(e) throw e
case Right(p) p
}
@ -438,7 +464,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
def locker: Locker = provider.locker
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, internalClassLoader))
threadFactory, eventStream, deadLetterMailbox, scheduler, propertyMaster))
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
@ -557,8 +583,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
private def loadExtensions() {
import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn
import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs }
getObjectFor[AnyRef](fqcn, internalClassLoader).fold(_ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
propertyMaster.getObjectFor[AnyRef](fqcn).fold(_ propertyMaster.getInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match {
case Right(p: ExtensionIdProvider) registerExtension(p.lookup());
case Right(p: ExtensionId[_]) registerExtension(p);
case Right(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)

View file

@ -8,7 +8,6 @@ import akka.util.Duration
import com.typesafe.config._
import akka.routing._
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
import akka.util.ReflectiveAccess
/**
* This class represents deployment configuration for a given actor path. It is
@ -83,7 +82,7 @@ case object NoScopeGiven extends Scope {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) {
class Deployer(val settings: ActorSystem.Settings, val propertyMaster: PropertyMaster) {
import scala.collection.JavaConverters._
@ -125,7 +124,7 @@ class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case fqn
val args = Seq(classOf[Config] -> deployment)
ReflectiveAccess.createInstance[RouterConfig](fqn, args, classloader) match {
propertyMaster.getInstanceFor[RouterConfig](fqn, args) match {
case Right(router) router
case Left(exception)
throw new IllegalArgumentException(

View file

@ -3,8 +3,6 @@
*/
package akka.actor
import akka.util.ReflectiveAccess
/**
* The basic ActorSystem covers all that is needed for locally running actors,
* using futures and so on. In addition, more features can hook into it and
@ -73,12 +71,12 @@ trait ExtensionIdProvider {
/**
* This is a one-stop-shop if all you want is an extension which is
* constructed with the ActorSystemImpl as its only constructor argument:
* constructed with the ExtendedActorSystem as its only constructor argument:
*
* {{{
* object MyExt extends ExtensionKey[Ext]
*
* class Ext(system: ActorSystemImpl) extends MyExt {
* class Ext(system: ExtendedActorSystem) extends MyExt {
* ...
* }
* }}}
@ -89,7 +87,7 @@ trait ExtensionIdProvider {
* public class MyExt extends Extension {
* static final ExtensionKey<MyExt> key = new ExtensionKey<MyExt>(MyExt.class);
*
* public MyExt(ActorSystemImpl system) {
* public MyExt(ExtendedActorSystem system) {
* ...
* }
* }}}
@ -99,7 +97,7 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassManifest[T]) extend
override def lookup(): ExtensionId[T] = this
def createExtension(system: ExtendedActorSystem): T =
ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match {
PropertyMaster.getInstanceFor[T](m.erasure, Seq(classOf[ExtendedActorSystem] -> system)) match {
case Left(ex) throw ex
case Right(r) r
}

View file

@ -0,0 +1,102 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.util.NonFatal
import java.lang.reflect.InvocationTargetException
/**
* The property master is responsible for acquiring all props needed for a
* performance; in Akka this is the class which is used for reflectively
* loading all configurable parts of an actor system.
*/
trait PropertyMaster {
def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]]
def getInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T]
def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T]
/**
* This is needed e.g. by the JavaSerializer to build the ObjectInputStream.
*/
def classLoader: ClassLoader
}
object PropertyMaster {
def getInstanceFor[T: ClassManifest](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = {
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
withErrorHandling {
val constructor = clazz.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*).asInstanceOf[T]
val t = classManifest[T].erasure
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t))
}
}
/**
* Caught exception is returned as Left(exception).
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
* Other `Throwable`, such as `Error` is thrown.
*/
@inline
final def withErrorHandling[T](body: Either[Throwable, T]): Either[Throwable, T] =
try body catch {
case e: InvocationTargetException
e.getTargetException match {
case NonFatal(t) Left(t)
case t throw t
}
case NonFatal(e) Left(e)
}
}
class DefaultPropertyMaster(val classLoader: ClassLoader) extends PropertyMaster {
import PropertyMaster.withErrorHandling
override def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]] =
try {
val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]]
val t = classManifest[T].erasure
if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c))
} catch {
case NonFatal(e) Left(e)
}
override def getInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] =
getClassFor(fqcn).fold(Left(_), { c
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
withErrorHandling {
val constructor = c.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*)
val t = classManifest[T].erasure
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t))
}
})
override def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T] = {
getClassFor(fqcn).fold(Left(_), { c
withErrorHandling {
val module = c.getDeclaredField("MODULE$")
module.setAccessible(true)
val t = classManifest[T].erasure
module.get(null) match {
case null Left(new NullPointerException)
case x if !t.isInstance(x) Left(new ClassCastException(fqcn + " is not a subtype of " + t))
case x Right(x.asInstanceOf[T])
}
}
})
}
}

View file

@ -129,7 +129,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length)
for (i 0 until ps.length) {
val p = ps(i)
val s = SerializationExtension(Serialization.currentSystem.value).findSerializerFor(p)
val system = akka.serialization.JavaSerializer.currentSystem.value
val s = SerializationExtension(system).findSerializerFor(p)
val m = if (s.includeManifest) p.getClass else null
serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
}
@ -146,7 +147,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
//TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = {
val system = akka.serialization.Serialization.currentSystem.value
val system = akka.serialization.JavaSerializer.currentSystem.value
if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
@ -158,7 +159,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for (i 0 until a.length) {
val (sId, manifest, bytes) = a(i)
deserializedParameters(i) = serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
deserializedParameters(i) =
serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
}
deserializedParameters

View file

@ -12,7 +12,6 @@ import akka.actor.ActorSystem
import scala.annotation.tailrec
import akka.event.EventStream
import com.typesafe.config.Config
import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
import akka.jsr166y.ForkJoinPool
import akka.util.NonFatal
@ -26,7 +25,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS
val ser = SerializationExtension(system)
ser.serialize(msg) match { //Verify serializability
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability
case Right(bytes) ser.deserialize(bytes, msg.getClass) match { //Verify deserializability
case Left(t) throw t
case _ //All good
}
@ -369,7 +368,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
}
case fqcn
val args = Seq(classOf[Config] -> config)
ReflectiveAccess.createInstance[MailboxType](fqcn, args, prerequisites.classloader) match {
prerequisites.propertyMaster.getInstanceFor[MailboxType](fqcn, args) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException(
@ -385,8 +384,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn
val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites), prerequisites.classloader) match {
val args = Seq(
classOf[Config] -> config,
classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.propertyMaster.getInstanceFor[ExecutorServiceConfigurator](fqcn, args) match {
case Right(instance) instance
case Left(exception) throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],

View file

@ -4,22 +4,24 @@
package akka.dispatch
import akka.actor.newUuid
import akka.util.{ Duration, ReflectiveAccess }
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit, ThreadFactory }
import scala.collection.JavaConverters.mapAsJavaMapConverter
import com.typesafe.config.{ ConfigFactory, Config }
import Dispatchers.DefaultDispatcherId
import akka.actor.{ Scheduler, PropertyMaster, ActorSystem }
import akka.event.Logging.Warning
import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap }
import akka.event.EventStream
import akka.util.Duration
trait DispatcherPrerequisites {
def threadFactory: ThreadFactory
def eventStream: EventStream
def deadLetterMailbox: Mailbox
def scheduler: Scheduler
def classloader: ClassLoader
def propertyMaster: PropertyMaster
}
case class DefaultDispatcherPrerequisites(
@ -27,7 +29,7 @@ case class DefaultDispatcherPrerequisites(
val eventStream: EventStream,
val deadLetterMailbox: Mailbox,
val scheduler: Scheduler,
val classloader: ClassLoader) extends DispatcherPrerequisites
val propertyMaster: PropertyMaster) extends DispatcherPrerequisites
object Dispatchers {
/**
@ -137,7 +139,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn
val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, args, prerequisites.classloader) match {
prerequisites.propertyMaster.getInstanceFor[MessageDispatcherConfigurator](fqn, args) match {
case Right(configurator) configurator
case Left(exception)
throw new IllegalArgumentException(

View file

@ -6,7 +6,6 @@ package akka.event
import akka.actor._
import akka.AkkaException
import akka.actor.ActorSystem.Settings
import akka.util.ReflectiveAccess
import akka.config.ConfigurationException
import akka.util.ReentrantGuard
import akka.util.duration._
@ -101,7 +100,7 @@ trait LoggingBus extends ActorEventBus {
if loggerName != StandardOutLoggerName
} yield {
try {
ReflectiveAccess.getClassFor[Actor](loggerName, system.internalClassLoader) match {
system.propertyMaster.getClassFor[Actor](loggerName) match {
case Right(actorClass) addLogger(system, actorClass, level, logName)
case Left(exception) throw exception
}
@ -350,7 +349,7 @@ object Logging {
object Extension extends ExtensionKey[LogExt]
class LogExt(system: ActorSystemImpl) extends Extension {
class LogExt(system: ExtendedActorSystem) extends Extension {
private val loggerId = new AtomicInteger
def id() = loggerId.incrementAndGet()
}

View file

@ -61,6 +61,7 @@ trait ToBinary[T <: Actor] {
* Type class definition for Actor Serialization.
* Client needs to implement Format[] for the respective actor.
*/
// FIXME RK: should this go? Its not used anywhere, looks like cluster residue.
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
/**
@ -97,7 +98,7 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T]
def fromBinary(bytes: Array[Byte], act: T): T = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T]
def toBinary(ac: T) = serializer.toBinary(ac)
def toBinary(ac: T): Array[Byte] = serializer.toBinary(ac)
}

View file

@ -5,30 +5,16 @@
package akka.serialization
import akka.AkkaException
import akka.util.ReflectiveAccess
import scala.util.DynamicVariable
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
import akka.actor.{ Extension, ExtendedActorSystem, Address }
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging
case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
/**
* This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization.
*
* If you are using Serializers yourself, outside of SerializationExtension,
* you'll need to surround the serialization/deserialization with:
*
* currentSystem.withValue(system) {
* ...code...
* }
*/
val currentSystem = new DynamicVariable[ActorSystem](null)
/**
* This holds a reference to the current transport address to be inserted
* into local actor refs during serialization.
@ -74,7 +60,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception Left(e) }
try Right(findSerializerFor(o).toBinary(o))
catch { case e: Exception Left(e) }
/**
* Deserializes the given array of bytes using the specified serializer id,
@ -83,26 +70,18 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
def deserialize(bytes: Array[Byte],
serializerId: Int,
clazz: Option[Class[_]],
classLoader: ClassLoader): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader)))
}
} catch { case e: Exception Left(e) }
clazz: Option[Class[_]]): Either[Exception, AnyRef] =
try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
catch { case e: Exception Left(e) }
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) }
} catch { case e: Exception Left(e) }
def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Exception, AnyRef] =
try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
catch { case e: Exception Left(e) }
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null,
@ -149,8 +128,11 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
/**
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs)
def serializerOf(serializerFQN: String): Either[Throwable, Serializer] = {
val pm = system.propertyMaster
pm.getInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system))
.fold(_ pm.getInstanceFor[Serializer](serializerFQN, Seq()), Right(_))
}
/**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)

View file

@ -6,11 +6,29 @@ package akka.serialization
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
import akka.util.ClassLoaderObjectInputStream
import akka.actor.PropertyMaster
import akka.actor.ExtendedActorSystem
import scala.util.DynamicVariable
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object
* A Serializer represents a bimap between an object and an array of bytes representing that object.
*
* Serializers are loaded using reflection during [[akka.actor.ActorSystem]]
* start-up, where two constructors are tried in order:
*
* <ul>
* <li>taking exactly one argument of type [[akka.actor.ExtendedActorSystem]];
* this should be the preferred one because all reflective loading of classes
* during deserialization should use ExtendedActorSystem.propertyMaster (see
* [[akka.actor.PropertyMaster]]), and</li>
* <li>without arguments, which is only an option if the serializer does not
* load classes using reflection.</li>
* </ul>
*
* <b>Be sure to always use the PropertyManager for loading classes!</b>
*/
trait Serializer extends scala.Serializable {
/**
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic
* Values from 0 to 16 is reserved for Akka internal usage
@ -28,42 +46,52 @@ trait Serializer extends scala.Serializable {
def includeManifest: Boolean
/**
* Deserializes the given Array of Bytes into an AnyRef
* Produces an object from an array of bytes, with an optional type-hint;
* the class should be loaded using ActorSystem.propertyMaster.
*/
def fromBinary(bytes: Array[Byte]): AnyRef = fromBinary(bytes, None, None)
/**
* Deserializes the given Array of Bytes into an AnyRef with an optional type hint
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = fromBinary(bytes, manifest, None)
/**
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]], classLoader: Option[ClassLoader]): AnyRef
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef
}
/**
* Java API for creating a Serializer
* Java API for creating a Serializer: make sure to include a constructor which
* takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because
* that is the preferred constructor which will be invoked when reflectively instantiating
* the JSerializer (also possible with empty constructor).
*/
abstract class JSerializer extends Serializer {
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef =
fromBinary(bytes, manifest.orNull, classLoader.orNull)
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
fromBinary(bytes, manifest.orNull)
/**
* This method should be overridden,
* manifest and classLoader may be null.
*/
def fromBinary(bytes: Array[Byte], manifest: Class[_], classLoader: ClassLoader): AnyRef
def fromBinary(bytes: Array[Byte], manifest: Class[_]): AnyRef
}
object JavaSerializer extends JavaSerializer
object NullSerializer extends NullSerializer
object JavaSerializer {
/**
* This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization.
*
* If you are using Serializers yourself, outside of SerializationExtension,
* you'll need to surround the serialization/deserialization with:
*
* currentSystem.withValue(system) {
* ...code...
* }
*/
val currentSystem = new DynamicVariable[ExtendedActorSystem](null)
}
/**
* This Serializer uses standard Java Serialization
*/
class JavaSerializer extends Serializer {
class JavaSerializer(val system: ExtendedActorSystem) extends Serializer {
def includeManifest: Boolean = false
@ -77,12 +105,11 @@ class JavaSerializer extends Serializer {
bos.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None,
classLoader: Option[ClassLoader] = None): AnyRef = {
val in =
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else
new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val in = new ClassLoaderObjectInputStream(system.propertyMaster.classLoader, new ByteArrayInputStream(bytes))
val obj = JavaSerializer.currentSystem.withValue(system) {
in.readObject
}
in.close()
obj
}
@ -96,5 +123,5 @@ class NullSerializer extends Serializer {
def includeManifest: Boolean = false
def identifier = 0
def toBinary(o: AnyRef) = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
object Reflect {
val getCallerClass: Option[Int Class[_]] = {
try {
val c = Class.forName("sun.reflect.Reflection");
val m = c.getMethod("getCallerClass", Array(classOf[Int]): _*)
Some((i: Int) m.invoke(null, Array[AnyRef](i.asInstanceOf[Integer]): _*).asInstanceOf[Class[_]])
} catch {
case NonFatal(e) None
}
}
}

View file

@ -1,126 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import java.lang.reflect.InvocationTargetException
object ReflectiveAccess {
val loader = getClass.getClassLoader
val noParams: Array[Class[_]] = Array()
val noArgs: Array[AnyRef] = Array()
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Either[Exception, T] = withErrorHandling {
assert(clazz ne null)
assert(params ne null)
assert(args ne null)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling {
assert(params ne null)
assert(args ne null)
getClassFor(fqn, classloader) match {
case Right(value)
val ctor = value.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
}
def createInstance[T](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
createInstance(clazz, args.map(_._1).toArray, args.map(_._2).toArray)
def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)], classloader: ClassLoader): Either[Exception, T] =
createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, classloader)
def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, loader)
//Obtains a reference to fqn.MODULE$
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
getClassFor(fqn, classloader) match {
case Right(value)
val instance = value.getDeclaredField("MODULE$")
instance.setAccessible(true)
val obj = instance.get(null)
if (obj eq null) Left(new NullPointerException) else Right(obj.asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try {
assert(fqn ne null)
// First, use the specified CL
val first = try {
Right(classloader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (first.isRight) first
else {
// Second option is to use the ContextClassLoader
val second = try {
Right(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (second.isRight) second
else {
val third = try {
if (classloader ne loader) Right(loader.loadClass(fqn).asInstanceOf[Class[T]]) else Left(null) //Horrid
} catch {
case c: ClassNotFoundException Left(c)
}
if (third.isRight) third
else {
try {
Right(Class.forName(fqn).asInstanceOf[Class[T]]) // Last option is Class.forName
} catch {
case c: ClassNotFoundException Left(c)
}
}
}
}
} catch {
case e: Exception Left(e)
}
/**
* Caught exception is returned as Left(exception).
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
* Other `Throwable`, such as `Error` is thrown.
*/
@inline
private final def withErrorHandling[T](body: Either[Exception, T]): Either[Exception, T] = {
try {
body
} catch {
case e: InvocationTargetException e.getTargetException match {
case t: Exception Left(t)
case t throw t
}
case e: Exception
Left(e)
}
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.docs.serialization;
import akka.japi.Option;
import akka.serialization.JSerializer;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
@ -13,6 +14,7 @@ import static org.junit.Assert.*;
import akka.serialization.*;
import akka.actor.ActorSystem;
import akka.actor.PropertyMaster;
import com.typesafe.config.*;
//#imports
@ -45,8 +47,7 @@ public class SerializationDocTestBase {
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
@Override public Object fromBinary(byte[] bytes,
Class clazz,
ClassLoader classLoader) {
Class clazz) {
// Put your code that deserializes here
//#...
return null;
@ -128,7 +129,7 @@ public class SerializationDocTestBase {
// Turn it back into an object,
// the nulls are for the class manifest and for the classloader
String back = (String)serializer.fromBinary(bytes);
String back = (String)serializer.fromBinary(bytes, Option.<Class<?>>none().asScala());
// Voilá!
assertEquals(original, back);

View file

@ -6,7 +6,7 @@ package akka.docs.serialization
import org.scalatest.matchers.MustMatchers
import akka.testkit._
//#imports
import akka.actor.ActorSystem
import akka.actor.{ ActorSystem, PropertyMaster }
import akka.serialization._
import com.typesafe.config.ConfigFactory
@ -35,8 +35,7 @@ class MyOwnSerializer extends Serializer {
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]],
classLoader: Option[ClassLoader] = None): AnyRef = {
clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
//#...
null
@ -147,9 +146,7 @@ class SerializationDocSpec extends AkkaSpec {
val bytes = serializer.toBinary(original)
// Turn it back into an object
val back = serializer.fromBinary(bytes,
manifest = None,
classLoader = None)
val back = serializer.fromBinary(bytes, manifest = None)
// Voilá!
back must equal(original)

View file

@ -22,7 +22,7 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt

View file

@ -17,7 +17,7 @@ class FileBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new FileBasedMailbox(owner)
}
class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMailbox")

View file

@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
}
abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
import DurableExecutableMailboxConfig._
def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem]
@ -22,15 +22,13 @@ abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner)
}
trait DurableMessageSerialization {
def owner: ActorContext
trait DurableMessageSerialization { this: DurableMailbox
def serialize(durableMessage: Envelope): Array[Byte] = {
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
val message = MessageSerializer.serialize(owner.system, durableMessage.message.asInstanceOf[AnyRef])
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
val builder = RemoteMessageProtocol.newBuilder
.setMessage(message)
.setRecipient(serializeActorRef(owner.self))
@ -41,13 +39,13 @@ trait DurableMessageSerialization {
def deserialize(bytes: Array[Byte]): Envelope = {
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath)
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = system.actorFor(refProtocol.getPath)
val durableMessage = RemoteMessageProtocol.parseFrom(bytes)
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage, getClass.getClassLoader)
val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
val sender = deserializeActorRef(durableMessage.getSender)
new Envelope(message, sender)(owner.system)
new Envelope(message, sender)(system)
}
}

View file

@ -65,7 +65,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
system.log.debug("Deserializing a durable message from MongoDB: {}", doc)
val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
val msg = MessageSerializer.deserialize(system, msgData, system.internalClassLoader)
val msg = MessageSerializer.deserialize(system, msgData)
val ownerPath = doc.as[String]("ownerPath")
val senderPath = doc.as[String]("senderPath")
val sender = system.actorFor(senderPath)

View file

@ -32,7 +32,7 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
*
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/
class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!

View file

@ -19,7 +19,7 @@ class RedisBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new RedisBasedMailbox(owner)
}
class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)

View file

@ -20,7 +20,7 @@ class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner)
}
class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues"

View file

@ -6,32 +6,25 @@ package akka.remote
import akka.remote.RemoteProtocol._
import com.google.protobuf.ByteString
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension
import akka.util.ReflectiveAccess
object MessageSerializer {
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: ClassLoader): AnyRef = {
val clazz = if (messageProtocol.hasMessageManifest) {
Option(ReflectiveAccess.getClassFor[AnyRef](
messageProtocol.getMessageManifest.toStringUtf8,
classLoader) match {
case Left(e) throw e
case Right(r) r
})
} else None
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
clazz,
classLoader) match {
def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = {
val clazz =
if (messageProtocol.hasMessageManifest) {
system.propertyMaster.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8)
.fold(throw _, Some(_))
} else None
SerializationExtension(system)
.deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match {
case Left(e) throw e
case Right(r) r
}
}
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
def serialize(system: ExtendedActorSystem, message: AnyRef): MessageProtocol = {
val s = SerializationExtension(system)
val serializer = s.findSerializerFor(message)
val builder = MessageProtocol.newBuilder

View file

@ -12,7 +12,6 @@ import akka.event.EventStream
import akka.config.ConfigurationException
import java.util.concurrent.{ TimeoutException }
import com.typesafe.config.Config
import akka.util.ReflectiveAccess
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
@ -28,11 +27,11 @@ class RemoteActorRefProvider(
val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler,
val classloader: ClassLoader) extends ActorRefProvider {
val propertyMaster: PropertyMaster) extends ActorRefProvider {
val remoteSettings = new RemoteSettings(settings.config, systemName)
val deployer = new RemoteDeployer(settings, classloader)
val deployer = new RemoteDeployer(settings, propertyMaster)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer)
@ -84,7 +83,7 @@ class RemoteActorRefProvider(
classOf[ActorSystemImpl] -> system,
classOf[RemoteActorRefProvider] -> this)
ReflectiveAccess.createInstance[RemoteTransport](fqn, args, system.internalClassLoader) match {
system.propertyMaster.getInstanceFor[RemoteTransport](fqn, args) match {
case Left(problem) throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
case Right(remote) remote
}

View file

@ -12,7 +12,7 @@ case class RemoteScope(node: Address) extends Scope {
def withFallback(other: Scope): Scope = this
}
class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) {
class RemoteDeployer(_settings: ActorSystem.Settings, _pm: PropertyMaster) extends Deployer(_settings, _pm) {
override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._

View file

@ -219,7 +219,7 @@ class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) {
lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver)
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, getClass.getClassLoader)
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage)
override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
}

View file

@ -5,6 +5,7 @@
package akka.serialization
import com.google.protobuf.Message
import akka.actor.PropertyMaster
/**
* This Serializer serializes `com.google.protobuf.Message`s
@ -19,7 +20,7 @@ class ProtobufSerializer extends Serializer {
case _ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef =
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
clazz match {
case None throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
case Some(c) c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]

View file

@ -5,7 +5,7 @@
package akka.testkit
import akka.actor._
import akka.util.{ ReflectiveAccess, Duration }
import akka.util.Duration
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.Stack
import akka.dispatch._
@ -121,8 +121,7 @@ object TestActorRef {
def apply[T <: Actor](implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
def apply[T <: Actor](name: String)(implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[T](m.erasure, noParams, noArgs) match {
PropertyMaster.getInstanceFor[T](m.erasure, Seq()) match {
case Right(value) value
case Left(exception) throw new ActorInitializationException(null,
"Could not instantiate Actor" +