Merge branch 'wip-1750-remove-ReflectiveAccess-∂π'
This commit is contained in:
commit
47741511aa
38 changed files with 398 additions and 407 deletions
|
|
@ -40,10 +40,10 @@ public class JavaExtension {
|
|||
static final ExtensionKey<OtherExtension> key = new ExtensionKey<OtherExtension>(OtherExtension.class) {
|
||||
};
|
||||
|
||||
public final ActorSystemImpl system;
|
||||
public final ExtendedActorSystem system;
|
||||
|
||||
public OtherExtension(ActorSystemImpl i) {
|
||||
system = i;
|
||||
public OtherExtension(ExtendedActorSystem system) {
|
||||
this.system = system;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,11 +11,10 @@ import akka.testkit._
|
|||
import akka.util.Timeout
|
||||
import akka.util.duration._
|
||||
import java.lang.IllegalStateException
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.serialization.Serialization
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
|
||||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
|
|
@ -240,6 +239,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
"be serializable using Java Serialization on local node" in {
|
||||
val a = system.actorOf(Props[InnerActor])
|
||||
val esys = system.asInstanceOf[ExtendedActorSystem]
|
||||
|
||||
import java.io._
|
||||
|
||||
|
|
@ -251,14 +251,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
out.flush
|
||||
out.close
|
||||
|
||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
val bytes = baos.toByteArray
|
||||
|
||||
JavaSerializer.currentSystem.withValue(esys) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
|
||||
val readA = in.readObject
|
||||
|
||||
a.isInstanceOf[LocalActorRef] must be === true
|
||||
readA.isInstanceOf[LocalActorRef] must be === true
|
||||
(readA eq a) must be === true
|
||||
}
|
||||
|
||||
val ser = new JavaSerializer(esys)
|
||||
val readA = ser.fromBinary(bytes, None)
|
||||
readA.isInstanceOf[LocalActorRef] must be === true
|
||||
(readA eq a) must be === true
|
||||
}
|
||||
|
||||
"throw an exception on deserialize if no system in scope" in {
|
||||
|
|
@ -297,7 +304,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
out.flush
|
||||
out.close
|
||||
|
||||
Serialization.currentSystem.withValue(sysImpl) {
|
||||
JavaSerializer.currentSystem.withValue(sysImpl) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
|||
import akka.util.Duration
|
||||
import akka.util.Timeout
|
||||
import akka.util.duration._
|
||||
import akka.serialization.Serialization
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import annotation.tailrec
|
||||
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
|
||||
|
|
@ -19,6 +18,7 @@ import akka.japi.{ Creator, Option ⇒ JOption }
|
|||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
|
||||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
|
||||
object TypedActorSpec {
|
||||
|
||||
|
|
@ -367,7 +367,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
|
||||
"be able to serialize and deserialize invocations" in {
|
||||
import java.io._
|
||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) {
|
||||
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
|
||||
val baos = new ByteArrayOutputStream(8192 * 4)
|
||||
val out = new ObjectOutputStream(baos)
|
||||
|
|
@ -386,7 +386,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
"be able to serialize and deserialize invocations' parameters" in {
|
||||
import java.io._
|
||||
val someFoo: Foo = new Bar
|
||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) {
|
||||
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
|
||||
val baos = new ByteArrayOutputStream(8192 * 4)
|
||||
val out = new ObjectOutputStream(baos)
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address], None) match {
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(add) ⇒ assert(add === addr)
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person], None) match {
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(p) ⇒ assert(p === person)
|
||||
}
|
||||
|
|
@ -114,7 +114,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record], None) match {
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(p) ⇒ assert(p === r)
|
||||
}
|
||||
|
|
@ -146,7 +146,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
out.close()
|
||||
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
|
||||
Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
|
||||
JavaSerializer.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
|
||||
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
|
||||
(deadLetters eq a.deadLetters) must be(true)
|
||||
}
|
||||
|
|
@ -285,8 +285,5 @@ class TestSerializer extends Serializer {
|
|||
Array.empty[Byte]
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None,
|
||||
classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
null
|
||||
}
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
|
||||
}
|
||||
|
|
|
|||
|
|
@ -220,7 +220,7 @@ private[akka] class ActorCell(
|
|||
val ser = SerializationExtension(system)
|
||||
ser.serialize(props.creator) match {
|
||||
case Left(t) ⇒ throw t
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass, None) match {
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match {
|
||||
case Left(t) ⇒ throw t
|
||||
case _ ⇒ //All good
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.dispatch._
|
|||
import akka.util._
|
||||
import scala.collection.immutable.Stack
|
||||
import java.lang.{ UnsupportedOperationException, IllegalStateException }
|
||||
import akka.serialization.Serialization
|
||||
import akka.serialization.{ Serialization, JavaSerializer }
|
||||
import akka.event.EventStream
|
||||
import scala.annotation.tailrec
|
||||
import java.util.concurrent.{ ConcurrentHashMap }
|
||||
|
|
@ -335,7 +335,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
*/
|
||||
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
||||
case class SerializedActorRef private (path: String) {
|
||||
import akka.serialization.Serialization.currentSystem
|
||||
import akka.serialization.JavaSerializer.currentSystem
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = currentSystem.value match {
|
||||
|
|
@ -401,7 +401,7 @@ private[akka] object DeadLetterActorRef {
|
|||
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
||||
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters
|
||||
private def readResolve(): AnyRef = JavaSerializer.currentSystem.value.deadLetters
|
||||
}
|
||||
|
||||
val serialized = new SerializedDeadLetterActorRef
|
||||
|
|
|
|||
|
|
@ -318,12 +318,12 @@ class LocalActorRefProvider(
|
|||
settings: ActorSystem.Settings,
|
||||
eventStream: EventStream,
|
||||
scheduler: Scheduler,
|
||||
classloader: ClassLoader) =
|
||||
dynamicAccess: DynamicAccess) =
|
||||
this(_systemName,
|
||||
settings,
|
||||
eventStream,
|
||||
scheduler,
|
||||
new Deployer(settings, classloader))
|
||||
new Deployer(settings, dynamicAccess))
|
||||
|
||||
val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
|
||||
|
||||
|
|
|
|||
|
|
@ -325,16 +325,16 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
def deathWatch: DeathWatch
|
||||
|
||||
/**
|
||||
* ClassLoader which is used for reflective accesses internally. This is set
|
||||
* to the context class loader, if one is set, or the class loader which
|
||||
* ClassLoader wrapper which is used for reflective accesses internally. This is set
|
||||
* to use the context class loader, if one is set, or the class loader which
|
||||
* loaded the ActorSystem implementation. The context class loader is also
|
||||
* set on all threads created by the ActorSystem, if one was set during
|
||||
* creation.
|
||||
*/
|
||||
def internalClassLoader: ClassLoader
|
||||
def dynamicAccess: DynamicAccess
|
||||
}
|
||||
|
||||
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem {
|
||||
class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Config) extends ExtendedActorSystem {
|
||||
|
||||
if (!name.matches("""^\w+$"""))
|
||||
throw new IllegalArgumentException("invalid ActorSystem name [" + name + "], must contain only word characters (i.e. [a-zA-Z_0-9])")
|
||||
|
|
@ -358,6 +358,35 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
final val threadFactory: MonitorableThreadFactory =
|
||||
MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader), uncaughtExceptionHandler)
|
||||
|
||||
/**
|
||||
* This is an extension point: by overriding this method, subclasses can
|
||||
* control all reflection activities of an actor system.
|
||||
*/
|
||||
protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(findClassLoader)
|
||||
|
||||
protected def findClassLoader: ClassLoader = {
|
||||
def findCaller(get: Int ⇒ Class[_]): ClassLoader = {
|
||||
val frames = Iterator.from(2).map(get)
|
||||
frames dropWhile { c ⇒
|
||||
c != null &&
|
||||
(c.getName.startsWith("akka.actor.ActorSystem") ||
|
||||
c.getName.startsWith("scala.Option") ||
|
||||
c.getName.startsWith("scala.collection.Iterator") ||
|
||||
c.getName.startsWith("akka.util.Reflect"))
|
||||
} next () match {
|
||||
case null ⇒ getClass.getClassLoader
|
||||
case c ⇒ c.getClassLoader
|
||||
}
|
||||
}
|
||||
|
||||
Option(Thread.currentThread.getContextClassLoader) orElse
|
||||
(Reflect.getCallerClass map findCaller) getOrElse
|
||||
getClass.getClassLoader
|
||||
}
|
||||
|
||||
private val _pm: DynamicAccess = createDynamicAccess()
|
||||
def dynamicAccess: DynamicAccess = _pm
|
||||
|
||||
def logConfiguration(): Unit = log.info(settings.toString)
|
||||
|
||||
protected def systemImpl = this
|
||||
|
|
@ -408,17 +437,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
|
||||
val scheduler: Scheduler = createScheduler()
|
||||
|
||||
val internalClassLoader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader
|
||||
|
||||
val provider: ActorRefProvider = {
|
||||
val arguments = Seq(
|
||||
classOf[String] -> name,
|
||||
classOf[Settings] -> settings,
|
||||
classOf[EventStream] -> eventStream,
|
||||
classOf[Scheduler] -> scheduler,
|
||||
classOf[ClassLoader] -> internalClassLoader)
|
||||
classOf[DynamicAccess] -> dynamicAccess)
|
||||
|
||||
ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, internalClassLoader) match {
|
||||
dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(p) ⇒ p
|
||||
}
|
||||
|
|
@ -440,7 +467,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
def locker: Locker = provider.locker
|
||||
|
||||
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
|
||||
threadFactory, eventStream, deadLetterMailbox, scheduler, internalClassLoader))
|
||||
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess))
|
||||
|
||||
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
|
||||
|
||||
|
|
@ -559,8 +586,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
private def loadExtensions() {
|
||||
import scala.collection.JavaConversions._
|
||||
settings.config.getStringList("akka.extensions") foreach { fqcn ⇒
|
||||
import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs }
|
||||
getObjectFor[AnyRef](fqcn, internalClassLoader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
|
||||
dynamicAccess.getObjectFor[AnyRef](fqcn).fold(_ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match {
|
||||
case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup());
|
||||
case Right(p: ExtensionId[_]) ⇒ registerExtension(p);
|
||||
case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import akka.util.Duration
|
|||
import com.typesafe.config._
|
||||
import akka.routing._
|
||||
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
|
||||
import akka.util.ReflectiveAccess
|
||||
|
||||
/**
|
||||
* This class represents deployment configuration for a given actor path. It is
|
||||
|
|
@ -86,7 +85,7 @@ case object NoScopeGiven extends Scope {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) {
|
||||
class Deployer(val settings: ActorSystem.Settings, val dynamicAccess: DynamicAccess) {
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
|
@ -128,7 +127,7 @@ class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader)
|
|||
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
||||
case fqn ⇒
|
||||
val args = Seq(classOf[Config] -> deployment)
|
||||
ReflectiveAccess.createInstance[RouterConfig](fqn, args, classloader) match {
|
||||
dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match {
|
||||
case Right(router) ⇒ router
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
|
|
|
|||
129
akka-actor/src/main/scala/akka/actor/DynamicAccess.scala
Normal file
129
akka-actor/src/main/scala/akka/actor/DynamicAccess.scala
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.util.NonFatal
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
/**
|
||||
* The DynamicAccess implementation is the class which is used for
|
||||
* loading all configurable parts of an actor system (the
|
||||
* [[akka.actor.ReflectiveDynamicAccess]] is the default implementation).
|
||||
*
|
||||
* This is an internal facility and users are not expected to encounter it
|
||||
* unless they are extending Akka in ways which go beyond simple Extensions.
|
||||
*/
|
||||
trait DynamicAccess {
|
||||
|
||||
/**
|
||||
* Convenience method which given a `Class[_]` object and a constructor description
|
||||
* will create a new instance of that class.
|
||||
*
|
||||
* {{{
|
||||
* val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name))
|
||||
* }}}
|
||||
*/
|
||||
def createInstanceFor[T: ClassManifest](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = {
|
||||
val types = args.map(_._1).toArray
|
||||
val values = args.map(_._2).toArray
|
||||
withErrorHandling {
|
||||
val constructor = clazz.getDeclaredConstructor(types: _*)
|
||||
constructor.setAccessible(true)
|
||||
val obj = constructor.newInstance(values: _*).asInstanceOf[T]
|
||||
val t = classManifest[T].erasure
|
||||
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a `Class[_]` object loaded with the right class loader (i.e. the one
|
||||
* returned by `classLoader`).
|
||||
*/
|
||||
def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]]
|
||||
|
||||
/**
|
||||
* Obtain an object conforming to the type T, which is expected to be
|
||||
* instantiated from a class designated by the fully-qualified class name
|
||||
* given, where the constructor is selected and invoked according to the
|
||||
* `args` argument. The exact usage of args depends on which type is requested,
|
||||
* see the relevant requesting code for details.
|
||||
*/
|
||||
def createInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T]
|
||||
|
||||
/**
|
||||
* Obtain the Scala “object” instance for the given fully-qualified class name, if there is one.
|
||||
*/
|
||||
def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T]
|
||||
|
||||
/**
|
||||
* This is the class loader to be used in those special cases where the
|
||||
* other factory method are not applicable (e.g. when constructing a ClassLoaderBinaryInputStream).
|
||||
*/
|
||||
def classLoader: ClassLoader
|
||||
|
||||
/**
|
||||
* Caught exception is returned as Left(exception).
|
||||
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
|
||||
* Other `Throwable`, such as `Error` is thrown.
|
||||
*/
|
||||
@inline
|
||||
final def withErrorHandling[T](body: ⇒ Either[Throwable, T]): Either[Throwable, T] =
|
||||
try body catch {
|
||||
case e: InvocationTargetException ⇒
|
||||
e.getTargetException match {
|
||||
case NonFatal(t) ⇒ Left(t)
|
||||
case t ⇒ throw t
|
||||
}
|
||||
case NonFatal(e) ⇒ Left(e)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the default [[akka.actor.DynamicAccess]] implementation used by [[akka.actor.ActorSystemImpl]]
|
||||
* unless overridden. It uses reflection to turn fully-qualified class names into `Class[_]` objects
|
||||
* and creates instances from there using `getDeclaredConstructor()` and invoking that. The class loader
|
||||
* to be used for all this is determined by the [[akka.actor.ActorSystemImpl]]’s `findClassLoader` method
|
||||
* by default.
|
||||
*/
|
||||
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
|
||||
|
||||
override def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]] =
|
||||
try {
|
||||
val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]]
|
||||
val t = classManifest[T].erasure
|
||||
if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c))
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ Left(e)
|
||||
}
|
||||
|
||||
override def createInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] =
|
||||
getClassFor(fqcn).fold(Left(_), { c ⇒
|
||||
val types = args.map(_._1).toArray
|
||||
val values = args.map(_._2).toArray
|
||||
withErrorHandling {
|
||||
val constructor = c.getDeclaredConstructor(types: _*)
|
||||
constructor.setAccessible(true)
|
||||
val obj = constructor.newInstance(values: _*)
|
||||
val t = classManifest[T].erasure
|
||||
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t))
|
||||
}
|
||||
})
|
||||
|
||||
override def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T] = {
|
||||
getClassFor(fqcn).fold(Left(_), { c ⇒
|
||||
withErrorHandling {
|
||||
val module = c.getDeclaredField("MODULE$")
|
||||
module.setAccessible(true)
|
||||
val t = classManifest[T].erasure
|
||||
module.get(null) match {
|
||||
case null ⇒ Left(new NullPointerException)
|
||||
case x if !t.isInstance(x) ⇒ Left(new ClassCastException(fqcn + " is not a subtype of " + t))
|
||||
case x ⇒ Right(x.asInstanceOf[T])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -3,8 +3,6 @@
|
|||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.util.ReflectiveAccess
|
||||
|
||||
/**
|
||||
* The basic ActorSystem covers all that is needed for locally running actors,
|
||||
* using futures and so on. In addition, more features can hook into it and
|
||||
|
|
@ -73,12 +71,12 @@ trait ExtensionIdProvider {
|
|||
|
||||
/**
|
||||
* This is a one-stop-shop if all you want is an extension which is
|
||||
* constructed with the ActorSystemImpl as its only constructor argument:
|
||||
* constructed with the ExtendedActorSystem as its only constructor argument:
|
||||
*
|
||||
* {{{
|
||||
* object MyExt extends ExtensionKey[Ext]
|
||||
*
|
||||
* class Ext(system: ActorSystemImpl) extends MyExt {
|
||||
* class Ext(system: ExtendedActorSystem) extends MyExt {
|
||||
* ...
|
||||
* }
|
||||
* }}}
|
||||
|
|
@ -89,7 +87,7 @@ trait ExtensionIdProvider {
|
|||
* public class MyExt extends Extension {
|
||||
* static final ExtensionKey<MyExt> key = new ExtensionKey<MyExt>(MyExt.class);
|
||||
*
|
||||
* public MyExt(ActorSystemImpl system) {
|
||||
* public MyExt(ExtendedActorSystem system) {
|
||||
* ...
|
||||
* }
|
||||
* }}}
|
||||
|
|
@ -99,7 +97,7 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassManifest[T]) extend
|
|||
|
||||
override def lookup(): ExtensionId[T] = this
|
||||
def createExtension(system: ExtendedActorSystem): T =
|
||||
ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match {
|
||||
system.dynamicAccess.createInstanceFor[T](m.erasure, Seq(classOf[ExtendedActorSystem] -> system)) match {
|
||||
case Left(ex) ⇒ throw ex
|
||||
case Right(r) ⇒ r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,7 +129,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length)
|
||||
for (i ← 0 until ps.length) {
|
||||
val p = ps(i)
|
||||
val s = SerializationExtension(Serialization.currentSystem.value).findSerializerFor(p)
|
||||
val system = akka.serialization.JavaSerializer.currentSystem.value
|
||||
val s = SerializationExtension(system).findSerializerFor(p)
|
||||
val m = if (s.includeManifest) p.getClass else null
|
||||
serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
|
||||
}
|
||||
|
|
@ -146,7 +147,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
//TODO implement writeObject and readObject to serialize
|
||||
//TODO Possible optimization is to special encode the parameter-types to conserve space
|
||||
private def readResolve(): AnyRef = {
|
||||
val system = akka.serialization.Serialization.currentSystem.value
|
||||
val system = akka.serialization.JavaSerializer.currentSystem.value
|
||||
if (system eq null) throw new IllegalStateException(
|
||||
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
|
||||
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
|
||||
|
|
@ -158,7 +159,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
|
||||
for (i ← 0 until a.length) {
|
||||
val (sId, manifest, bytes) = a(i)
|
||||
deserializedParameters(i) = serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
|
||||
deserializedParameters(i) =
|
||||
serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
|
||||
}
|
||||
|
||||
deserializedParameters
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import akka.actor.ActorSystem
|
|||
import scala.annotation.tailrec
|
||||
import akka.event.EventStream
|
||||
import com.typesafe.config.Config
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.util.NonFatal
|
||||
import akka.event.Logging.LogEventException
|
||||
|
|
@ -26,7 +25,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS
|
|||
val ser = SerializationExtension(system)
|
||||
ser.serialize(msg) match { //Verify serializability
|
||||
case Left(t) ⇒ throw t
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass) match { //Verify deserializability
|
||||
case Left(t) ⇒ throw t
|
||||
case _ ⇒ //All good
|
||||
}
|
||||
|
|
@ -369,7 +368,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
}
|
||||
case fqcn ⇒
|
||||
val args = Seq(classOf[Config] -> config)
|
||||
ReflectiveAccess.createInstance[MailboxType](fqcn, args, prerequisites.classloader) match {
|
||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
|
|
@ -385,8 +384,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
|
||||
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
|
||||
case fqcn ⇒
|
||||
val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
|
||||
ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites), prerequisites.classloader) match {
|
||||
val args = Seq(
|
||||
classOf[Config] -> config,
|
||||
classOf[DispatcherPrerequisites] -> prerequisites)
|
||||
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒ throw new IllegalArgumentException(
|
||||
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
|
||||
|
|
|
|||
|
|
@ -4,22 +4,24 @@
|
|||
|
||||
package akka.dispatch
|
||||
|
||||
import akka.actor.newUuid
|
||||
import akka.util.{ Duration, ReflectiveAccess }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.EventStream
|
||||
import akka.actor.Scheduler
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit, ThreadFactory }
|
||||
|
||||
import scala.collection.JavaConverters.mapAsJavaMapConverter
|
||||
|
||||
import com.typesafe.config.{ ConfigFactory, Config }
|
||||
|
||||
import Dispatchers.DefaultDispatcherId
|
||||
import akka.actor.{ Scheduler, DynamicAccess, ActorSystem }
|
||||
import akka.event.Logging.Warning
|
||||
import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap }
|
||||
import akka.event.EventStream
|
||||
import akka.util.Duration
|
||||
|
||||
trait DispatcherPrerequisites {
|
||||
def threadFactory: ThreadFactory
|
||||
def eventStream: EventStream
|
||||
def deadLetterMailbox: Mailbox
|
||||
def scheduler: Scheduler
|
||||
def classloader: ClassLoader
|
||||
def dynamicAccess: DynamicAccess
|
||||
}
|
||||
|
||||
case class DefaultDispatcherPrerequisites(
|
||||
|
|
@ -27,7 +29,7 @@ case class DefaultDispatcherPrerequisites(
|
|||
val eventStream: EventStream,
|
||||
val deadLetterMailbox: Mailbox,
|
||||
val scheduler: Scheduler,
|
||||
val classloader: ClassLoader) extends DispatcherPrerequisites
|
||||
val dynamicAccess: DynamicAccess) extends DispatcherPrerequisites
|
||||
|
||||
object Dispatchers {
|
||||
/**
|
||||
|
|
@ -137,7 +139,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
|
||||
case fqn ⇒
|
||||
val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
|
||||
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, args, prerequisites.classloader) match {
|
||||
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args) match {
|
||||
case Right(configurator) ⇒ configurator
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package akka.event
|
|||
import akka.actor._
|
||||
import akka.AkkaException
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.config.ConfigurationException
|
||||
import akka.util.ReentrantGuard
|
||||
import akka.util.duration._
|
||||
|
|
@ -101,7 +100,7 @@ trait LoggingBus extends ActorEventBus {
|
|||
if loggerName != StandardOutLoggerName
|
||||
} yield {
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](loggerName, system.internalClassLoader) match {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
|
|
@ -350,7 +349,7 @@ object Logging {
|
|||
|
||||
object Extension extends ExtensionKey[LogExt]
|
||||
|
||||
class LogExt(system: ActorSystemImpl) extends Extension {
|
||||
class LogExt(system: ExtendedActorSystem) extends Extension {
|
||||
private val loggerId = new AtomicInteger
|
||||
def id() = loggerId.incrementAndGet()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,103 +0,0 @@
|
|||
package akka.serialization
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import akka.actor.Actor
|
||||
|
||||
/**
|
||||
* trait Serializer {
|
||||
* @volatile
|
||||
* var classLoader: Option[ClassLoader] = None
|
||||
* def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
|
||||
*
|
||||
* def toBinary(obj: AnyRef): Array[Byte]
|
||||
* def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
|
||||
* }
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
* object Format {
|
||||
* implicit object Default extends Serializer {
|
||||
* import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
|
||||
* //import org.apache.commons.io.input.ClassLoaderObjectInputStream
|
||||
*
|
||||
* def toBinary(obj: AnyRef): Array[Byte] = {
|
||||
* val bos = new ByteArrayOutputStream
|
||||
* val out = new ObjectOutputStream(bos)
|
||||
* out.writeObject(obj)
|
||||
* out.close()
|
||||
* bos.toByteArray
|
||||
* }
|
||||
*
|
||||
* def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
* val in =
|
||||
* //if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else
|
||||
* new ObjectInputStream(new ByteArrayInputStream(bytes))
|
||||
* val obj = in.readObject
|
||||
* in.close()
|
||||
* obj
|
||||
* }
|
||||
*
|
||||
* def identifier: Byte = 111 //Pick a number and hope no one has chosen the same :-) 0 - 16 is reserved for Akka internals
|
||||
*
|
||||
* }
|
||||
*
|
||||
* val defaultSerializerName = Default.getClass.getName
|
||||
* }
|
||||
*/
|
||||
|
||||
trait FromBinary[T <: Actor] {
|
||||
def fromBinary(bytes: Array[Byte], act: T): T
|
||||
}
|
||||
|
||||
trait ToBinary[T <: Actor] {
|
||||
def toBinary(t: T): Array[Byte]
|
||||
}
|
||||
|
||||
/**
|
||||
* Type class definition for Actor Serialization.
|
||||
* Client needs to implement Format[] for the respective actor.
|
||||
*/
|
||||
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
|
||||
|
||||
/**
|
||||
* A default implementation for a stateless actor
|
||||
*
|
||||
* Create a Format object with the client actor as the implementation of the type class
|
||||
*
|
||||
* <pre>
|
||||
* object BinaryFormatMyStatelessActor {
|
||||
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
trait StatelessActorFormat[T <: Actor] extends Format[T] {
|
||||
def fromBinary(bytes: Array[Byte], act: T) = act
|
||||
|
||||
def toBinary(ac: T) = Array.empty[Byte]
|
||||
}
|
||||
|
||||
/**
|
||||
* A default implementation of the type class for a Format that specifies a serializer
|
||||
*
|
||||
* Create a Format object with the client actor as the implementation of the type class and
|
||||
* a serializer object
|
||||
*
|
||||
* <pre>
|
||||
* object BinaryFormatMyJavaSerializableActor {
|
||||
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||
* val serializer = Serializers.Java
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
|
||||
val serializer: Serializer
|
||||
|
||||
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T]
|
||||
|
||||
def toBinary(ac: T) = serializer.toBinary(ac)
|
||||
}
|
||||
|
|
@ -5,12 +5,12 @@
|
|||
package akka.serialization
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.util.ReflectiveAccess
|
||||
import scala.util.DynamicVariable
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
|
||||
import akka.event.Logging
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.util.NonFatal
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import java.io.NotSerializableException
|
||||
|
||||
|
|
@ -23,19 +23,6 @@ object Serialization {
|
|||
*/
|
||||
type ClassSerializer = (Class[_], Serializer)
|
||||
|
||||
/**
|
||||
* This holds a reference to the current ActorSystem (the surrounding context)
|
||||
* during serialization and deserialization.
|
||||
*
|
||||
* If you are using Serializers yourself, outside of SerializationExtension,
|
||||
* you'll need to surround the serialization/deserialization with:
|
||||
*
|
||||
* currentSystem.withValue(system) {
|
||||
* ...code...
|
||||
* }
|
||||
*/
|
||||
val currentSystem = new DynamicVariable[ActorSystem](null)
|
||||
|
||||
/**
|
||||
* This holds a reference to the current transport address to be inserted
|
||||
* into local actor refs during serialization.
|
||||
|
|
@ -71,8 +58,9 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
|
||||
* to either an Array of Bytes or an Exception if one was thrown.
|
||||
*/
|
||||
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
|
||||
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception ⇒ Left(e) }
|
||||
def serialize(o: AnyRef): Either[Throwable, Array[Byte]] =
|
||||
try Right(findSerializerFor(o).toBinary(o))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
|
||||
/**
|
||||
* Deserializes the given array of bytes using the specified serializer id,
|
||||
|
|
@ -81,26 +69,18 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
def deserialize(bytes: Array[Byte],
|
||||
serializerId: Int,
|
||||
clazz: Option[Class[_]],
|
||||
classLoader: ClassLoader): Either[Exception, AnyRef] =
|
||||
try {
|
||||
currentSystem.withValue(system) {
|
||||
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader)))
|
||||
}
|
||||
} catch { case e: Exception ⇒ Left(e) }
|
||||
clazz: Option[Class[_]]): Either[Throwable, AnyRef] =
|
||||
try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
|
||||
/**
|
||||
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
|
||||
* You can specify an optional ClassLoader to load the object into.
|
||||
* Returns either the resulting object or an Exception if one was thrown.
|
||||
*/
|
||||
def deserialize(
|
||||
bytes: Array[Byte],
|
||||
clazz: Class[_],
|
||||
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
|
||||
try {
|
||||
currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) }
|
||||
} catch { case e: Exception ⇒ Left(e) }
|
||||
def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Throwable, AnyRef] =
|
||||
try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
|
||||
/**
|
||||
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
|
||||
|
|
@ -149,10 +129,12 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tries to instantiate the specified Serializer by the FQN
|
||||
* Tries to load the specified Serializer by the fully-qualified name; the actual
|
||||
* loading is performed by the system’s [[akka.actor.DynamicAccess]].
|
||||
*/
|
||||
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
|
||||
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader)
|
||||
def serializerOf(serializerFQN: String): Either[Throwable, Serializer] =
|
||||
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)).fold(_ ⇒
|
||||
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()), Right(_))
|
||||
|
||||
/**
|
||||
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
|
||||
|
|
@ -169,7 +151,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
private[akka] val bindings: Seq[ClassSerializer] = {
|
||||
val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield {
|
||||
val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, identity[Class[_]])
|
||||
val c = system.dynamicAccess.getClassFor(k).fold(throw _, identity[Class[_]])
|
||||
(c, serializers(v))
|
||||
}
|
||||
sort(configuredBindings)
|
||||
|
|
|
|||
|
|
@ -6,11 +6,31 @@ package akka.serialization
|
|||
|
||||
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
|
||||
import akka.util.ClassLoaderObjectInputStream
|
||||
import akka.actor.DynamicAccess
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import scala.util.DynamicVariable
|
||||
|
||||
/**
|
||||
* A Serializer represents a bimap between an object and an array of bytes representing that object
|
||||
* A Serializer represents a bimap between an object and an array of bytes representing that object.
|
||||
*
|
||||
* Serializers are loaded using reflection during [[akka.actor.ActorSystem]]
|
||||
* start-up, where two constructors are tried in order:
|
||||
*
|
||||
* <ul>
|
||||
* <li>taking exactly one argument of type [[akka.actor.ExtendedActorSystem]];
|
||||
* this should be the preferred one because all reflective loading of classes
|
||||
* during deserialization should use ExtendedActorSystem.dynamicAccess (see
|
||||
* [[akka.actor.DynamicAccess]]), and</li>
|
||||
* <li>without arguments, which is only an option if the serializer does not
|
||||
* load classes using reflection.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <b>Be sure to always use the PropertyManager for loading classes!</b> This is necessary to
|
||||
* avoid strange match errors and inequalities which arise from different class loaders loading
|
||||
* the same class.
|
||||
*/
|
||||
trait Serializer {
|
||||
|
||||
/**
|
||||
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic
|
||||
* Values from 0 to 16 is reserved for Akka internal usage
|
||||
|
|
@ -28,42 +48,61 @@ trait Serializer {
|
|||
def includeManifest: Boolean
|
||||
|
||||
/**
|
||||
* Deserializes the given Array of Bytes into an AnyRef
|
||||
* Produces an object from an array of bytes, with an optional type-hint;
|
||||
* the class should be loaded using ActorSystem.dynamicAccess.
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte]): AnyRef = fromBinary(bytes, None, None)
|
||||
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef
|
||||
|
||||
/**
|
||||
* Deserializes the given Array of Bytes into an AnyRef with an optional type hint
|
||||
* Java API: deserialize without type hint
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = fromBinary(bytes, manifest, None)
|
||||
final def fromBinary(bytes: Array[Byte]): AnyRef = fromBinary(bytes, None)
|
||||
|
||||
/**
|
||||
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
|
||||
* Java API: deserialize with type hint
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]], classLoader: Option[ClassLoader]): AnyRef
|
||||
final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API for creating a Serializer
|
||||
* Java API for creating a Serializer: make sure to include a constructor which
|
||||
* takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because
|
||||
* that is the preferred constructor which will be invoked when reflectively instantiating
|
||||
* the JSerializer (also possible with empty constructor).
|
||||
*/
|
||||
abstract class JSerializer extends Serializer {
|
||||
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef =
|
||||
fromBinary(bytes, manifest.orNull, classLoader.orNull)
|
||||
final def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
|
||||
fromBinaryJava(bytes, manifest.orNull)
|
||||
|
||||
/**
|
||||
* This method should be overridden,
|
||||
* manifest and classLoader may be null.
|
||||
* This method must be implemented, manifest may be null.
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte], manifest: Class[_], classLoader: ClassLoader): AnyRef
|
||||
protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef
|
||||
}
|
||||
|
||||
object JavaSerializer extends JavaSerializer
|
||||
object NullSerializer extends NullSerializer
|
||||
|
||||
object JavaSerializer {
|
||||
|
||||
/**
|
||||
* This holds a reference to the current ActorSystem (the surrounding context)
|
||||
* during serialization and deserialization.
|
||||
*
|
||||
* If you are using Serializers yourself, outside of SerializationExtension,
|
||||
* you'll need to surround the serialization/deserialization with:
|
||||
*
|
||||
* currentSystem.withValue(system) {
|
||||
* ...code...
|
||||
* }
|
||||
*/
|
||||
val currentSystem = new DynamicVariable[ExtendedActorSystem](null)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This Serializer uses standard Java Serialization
|
||||
*/
|
||||
class JavaSerializer extends Serializer {
|
||||
class JavaSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||
|
||||
def includeManifest: Boolean = false
|
||||
|
||||
|
|
@ -77,12 +116,11 @@ class JavaSerializer extends Serializer {
|
|||
bos.toByteArray
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None,
|
||||
classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
val in =
|
||||
if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else
|
||||
new ObjectInputStream(new ByteArrayInputStream(bytes))
|
||||
val obj = in.readObject
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
|
||||
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new ByteArrayInputStream(bytes))
|
||||
val obj = JavaSerializer.currentSystem.withValue(system) {
|
||||
in.readObject
|
||||
}
|
||||
in.close()
|
||||
obj
|
||||
}
|
||||
|
|
@ -96,5 +134,5 @@ class NullSerializer extends Serializer {
|
|||
def includeManifest: Boolean = false
|
||||
def identifier = 0
|
||||
def toBinary(o: AnyRef) = nullAsBytes
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
|
||||
}
|
||||
|
|
|
|||
30
akka-actor/src/main/scala/akka/util/Reflect.scala
Normal file
30
akka-actor/src/main/scala/akka/util/Reflect.scala
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
/**
|
||||
* Collection of internal reflection utilities which may or may not be
|
||||
* available (most services specific to HotSpot, but fails gracefully).
|
||||
*/
|
||||
object Reflect {
|
||||
|
||||
/**
|
||||
* This optionally holds a function which looks N levels above itself
|
||||
* on the call stack and returns the `Class[_]` object for the code
|
||||
* executing in that stack frame. Implemented using
|
||||
* `sun.reflect.Reflection.getCallerClass` if available, None otherwise.
|
||||
*
|
||||
* Hint: when comparing to Thread.currentThread.getStackTrace, add two levels.
|
||||
*/
|
||||
val getCallerClass: Option[Int ⇒ Class[_]] = {
|
||||
try {
|
||||
val c = Class.forName("sun.reflect.Reflection");
|
||||
val m = c.getMethod("getCallerClass", Array(classOf[Int]): _*)
|
||||
Some((i: Int) ⇒ m.invoke(null, Array[AnyRef](i.asInstanceOf[java.lang.Integer]): _*).asInstanceOf[Class[_]])
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ None
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,126 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
object ReflectiveAccess {
|
||||
|
||||
val loader = getClass.getClassLoader
|
||||
val noParams: Array[Class[_]] = Array()
|
||||
val noArgs: Array[AnyRef] = Array()
|
||||
|
||||
def createInstance[T](clazz: Class[_],
|
||||
params: Array[Class[_]],
|
||||
args: Array[AnyRef]): Either[Exception, T] = withErrorHandling {
|
||||
assert(clazz ne null)
|
||||
assert(params ne null)
|
||||
assert(args ne null)
|
||||
val ctor = clazz.getDeclaredConstructor(params: _*)
|
||||
ctor.setAccessible(true)
|
||||
Right(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||
}
|
||||
|
||||
def createInstance[T](fqn: String,
|
||||
params: Array[Class[_]],
|
||||
args: Array[AnyRef],
|
||||
classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling {
|
||||
assert(params ne null)
|
||||
assert(args ne null)
|
||||
getClassFor(fqn, classloader) match {
|
||||
case Right(value) ⇒
|
||||
val ctor = value.getDeclaredConstructor(params: _*)
|
||||
ctor.setAccessible(true)
|
||||
Right(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||
case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
|
||||
}
|
||||
}
|
||||
|
||||
def createInstance[T](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
|
||||
createInstance(clazz, args.map(_._1).toArray, args.map(_._2).toArray)
|
||||
|
||||
def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)], classloader: ClassLoader): Either[Exception, T] =
|
||||
createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, classloader)
|
||||
|
||||
def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
|
||||
createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, loader)
|
||||
|
||||
//Obtains a reference to fqn.MODULE$
|
||||
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
|
||||
getClassFor(fqn, classloader) match {
|
||||
case Right(value) ⇒
|
||||
val instance = value.getDeclaredField("MODULE$")
|
||||
instance.setAccessible(true)
|
||||
val obj = instance.get(null)
|
||||
if (obj eq null) Left(new NullPointerException) else Right(obj.asInstanceOf[T])
|
||||
case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
Left(e)
|
||||
}
|
||||
|
||||
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try {
|
||||
assert(fqn ne null)
|
||||
|
||||
// First, use the specified CL
|
||||
val first = try {
|
||||
Right(classloader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException ⇒ Left(c)
|
||||
}
|
||||
|
||||
if (first.isRight) first
|
||||
else {
|
||||
// Second option is to use the ContextClassLoader
|
||||
val second = try {
|
||||
Right(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
|
||||
} catch {
|
||||
case c: ClassNotFoundException ⇒ Left(c)
|
||||
}
|
||||
|
||||
if (second.isRight) second
|
||||
else {
|
||||
val third = try {
|
||||
if (classloader ne loader) Right(loader.loadClass(fqn).asInstanceOf[Class[T]]) else Left(null) //Horrid
|
||||
} catch {
|
||||
case c: ClassNotFoundException ⇒ Left(c)
|
||||
}
|
||||
|
||||
if (third.isRight) third
|
||||
else {
|
||||
try {
|
||||
Right(Class.forName(fqn).asInstanceOf[Class[T]]) // Last option is Class.forName
|
||||
} catch {
|
||||
case c: ClassNotFoundException ⇒ Left(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ Left(e)
|
||||
}
|
||||
|
||||
/**
|
||||
* Caught exception is returned as Left(exception).
|
||||
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
|
||||
* Other `Throwable`, such as `Error` is thrown.
|
||||
*/
|
||||
@inline
|
||||
private final def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = {
|
||||
try {
|
||||
body
|
||||
} catch {
|
||||
case e: InvocationTargetException ⇒ e.getTargetException match {
|
||||
case t: Exception ⇒ Left(t)
|
||||
case t ⇒ throw t
|
||||
}
|
||||
case e: Exception ⇒
|
||||
Left(e)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.docs.serialization;
|
||||
|
||||
import akka.japi.Option;
|
||||
import akka.serialization.JSerializer;
|
||||
import akka.serialization.Serialization;
|
||||
import akka.serialization.SerializationExtension;
|
||||
|
|
@ -43,10 +44,8 @@ public class SerializationDocTestBase {
|
|||
|
||||
// "fromBinary" deserializes the given array,
|
||||
// using the type hint (if any, see "includeManifest" above)
|
||||
// into the optionally provided classLoader.
|
||||
@Override public Object fromBinary(byte[] bytes,
|
||||
Class clazz,
|
||||
ClassLoader classLoader) {
|
||||
@Override public Object fromBinaryJava(byte[] bytes,
|
||||
Class<?> clazz) {
|
||||
// Put your code that deserializes here
|
||||
//#...
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -103,3 +103,15 @@ which is done by extending ``akka.serialization.JSerializer``, like this:
|
|||
|
||||
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
|
||||
list which classes that should be serialized using it.
|
||||
|
||||
A Word About Java Serialization
|
||||
===============================
|
||||
|
||||
When using Java serialization without employing the :class:`JavaSerializer` for
|
||||
the task, you must make sure to supply a valid :class:`ExtendedActorSystem` in
|
||||
the dynamic variable ``JavaSerializer.currentSystem``. This is used when
|
||||
reading in the representation of an :class:`ActorRef` for turning the string
|
||||
representation into a real reference. :class:`DynamicVariable` is a
|
||||
thread-local variable, so be sure to have it set while deserializing anything
|
||||
which might contain actor references.
|
||||
|
||||
|
|
|
|||
|
|
@ -35,8 +35,7 @@ class MyOwnSerializer extends Serializer {
|
|||
// using the type hint (if any, see "includeManifest" above)
|
||||
// into the optionally provided classLoader.
|
||||
def fromBinary(bytes: Array[Byte],
|
||||
clazz: Option[Class[_]],
|
||||
classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
clazz: Option[Class[_]]): AnyRef = {
|
||||
// Put your code that deserializes here
|
||||
//#...
|
||||
null
|
||||
|
|
@ -143,9 +142,7 @@ class SerializationDocSpec extends AkkaSpec {
|
|||
val bytes = serializer.toBinary(original)
|
||||
|
||||
// Turn it back into an object
|
||||
val back = serializer.fromBinary(bytes,
|
||||
manifest = None,
|
||||
classLoader = None)
|
||||
val back = serializer.fromBinary(bytes, manifest = None)
|
||||
|
||||
// Voilá!
|
||||
back must equal(original)
|
||||
|
|
|
|||
|
|
@ -71,14 +71,14 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap], None) match {
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Load], None) match {
|
||||
ser.deserialize(m.payload(1), classOf[Load]) match {
|
||||
case Right(Load(timestamp, loadAverage)) ⇒
|
||||
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
|
|
@ -97,7 +97,7 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap], None) match {
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
|
|
|
|||
|
|
@ -101,3 +101,15 @@ First you need to create a class definition of your ``Serializer`` like so:
|
|||
|
||||
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
|
||||
list which classes that should be serialized using it.
|
||||
|
||||
A Word About Java Serialization
|
||||
===============================
|
||||
|
||||
When using Java serialization without employing the :class:`JavaSerializer` for
|
||||
the task, you must make sure to supply a valid :class:`ExtendedActorSystem` in
|
||||
the dynamic variable ``JavaSerializer.currentSystem``. This is used when
|
||||
reading in the representation of an :class:`ActorRef` for turning the string
|
||||
representation into a real reference. :class:`DynamicVariable` is a
|
||||
thread-local variable, so be sure to have it set while deserializing anything
|
||||
which might contain actor references.
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
|
||||
class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
|
||||
|
||||
private val settings = BeanstalkBasedMailboxExtension(owner.system)
|
||||
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ class FileBasedMailboxType(config: Config) extends MailboxType {
|
|||
override def create(owner: ActorContext) = new FileBasedMailbox(owner)
|
||||
}
|
||||
|
||||
class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
|
||||
class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
|
||||
|
||||
val log = Logging(system, "FileBasedMailbox")
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig {
|
|||
val Name = "[\\.\\/\\$\\s]".r
|
||||
}
|
||||
|
||||
abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
|
||||
abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
|
||||
import DurableExecutableMailboxConfig._
|
||||
|
||||
def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem]
|
||||
|
|
@ -22,15 +22,13 @@ abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner)
|
|||
|
||||
}
|
||||
|
||||
trait DurableMessageSerialization {
|
||||
|
||||
def owner: ActorContext
|
||||
trait DurableMessageSerialization { this: DurableMailbox ⇒
|
||||
|
||||
def serialize(durableMessage: Envelope): Array[Byte] = {
|
||||
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
|
||||
|
||||
val message = MessageSerializer.serialize(owner.system, durableMessage.message.asInstanceOf[AnyRef])
|
||||
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])
|
||||
val builder = RemoteMessageProtocol.newBuilder
|
||||
.setMessage(message)
|
||||
.setRecipient(serializeActorRef(owner.self))
|
||||
|
|
@ -41,13 +39,13 @@ trait DurableMessageSerialization {
|
|||
|
||||
def deserialize(bytes: Array[Byte]): Envelope = {
|
||||
|
||||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath)
|
||||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = system.actorFor(refProtocol.getPath)
|
||||
|
||||
val durableMessage = RemoteMessageProtocol.parseFrom(bytes)
|
||||
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage, getClass.getClassLoader)
|
||||
val message = MessageSerializer.deserialize(system, durableMessage.getMessage)
|
||||
val sender = deserializeActorRef(durableMessage.getSender)
|
||||
|
||||
new Envelope(message, sender)(owner.system)
|
||||
new Envelope(message, sender)(system)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
|
|||
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
|
||||
system.log.debug("Deserializing a durable message from MongoDB: {}", doc)
|
||||
val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
|
||||
val msg = MessageSerializer.deserialize(system, msgData, system.internalClassLoader)
|
||||
val msg = MessageSerializer.deserialize(system, msgData)
|
||||
val ownerPath = doc.as[String]("ownerPath")
|
||||
val senderPath = doc.as[String]("senderPath")
|
||||
val sender = system.actorFor(senderPath)
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
|
|||
*
|
||||
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
|
||||
*/
|
||||
class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
|
||||
class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
|
||||
// this implicit object provides the context for reading/writing things as MongoDurableMessage
|
||||
implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
|
||||
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class RedisBasedMailboxType(config: Config) extends MailboxType {
|
|||
override def create(owner: ActorContext) = new RedisBasedMailbox(owner)
|
||||
}
|
||||
|
||||
class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
|
||||
class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
|
||||
|
||||
private val settings = RedisBasedMailboxExtension(owner.system)
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
|
|||
override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner)
|
||||
}
|
||||
|
||||
class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
|
||||
class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
|
||||
|
||||
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
|
||||
val queueNode = "/queues"
|
||||
|
|
|
|||
|
|
@ -6,32 +6,25 @@ package akka.remote
|
|||
|
||||
import akka.remote.RemoteProtocol._
|
||||
import com.google.protobuf.ByteString
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.util.ReflectiveAccess
|
||||
|
||||
object MessageSerializer {
|
||||
|
||||
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: ClassLoader): AnyRef = {
|
||||
val clazz = if (messageProtocol.hasMessageManifest) {
|
||||
Option(ReflectiveAccess.getClassFor[AnyRef](
|
||||
messageProtocol.getMessageManifest.toStringUtf8,
|
||||
classLoader) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(r) ⇒ r
|
||||
})
|
||||
} else None
|
||||
SerializationExtension(system).deserialize(
|
||||
messageProtocol.getMessage.toByteArray,
|
||||
messageProtocol.getSerializerId,
|
||||
clazz,
|
||||
classLoader) match {
|
||||
def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = {
|
||||
val clazz =
|
||||
if (messageProtocol.hasMessageManifest) {
|
||||
system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8)
|
||||
.fold(throw _, Some(_))
|
||||
} else None
|
||||
SerializationExtension(system)
|
||||
.deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(r) ⇒ r
|
||||
}
|
||||
}
|
||||
|
||||
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
|
||||
def serialize(system: ExtendedActorSystem, message: AnyRef): MessageProtocol = {
|
||||
val s = SerializationExtension(system)
|
||||
val serializer = s.findSerializerFor(message)
|
||||
val builder = MessageProtocol.newBuilder
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import akka.event.EventStream
|
|||
import akka.config.ConfigurationException
|
||||
import java.util.concurrent.{ TimeoutException }
|
||||
import com.typesafe.config.Config
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.serialization.Serialization
|
||||
import akka.serialization.SerializationExtension
|
||||
|
||||
|
|
@ -28,11 +27,11 @@ class RemoteActorRefProvider(
|
|||
val settings: ActorSystem.Settings,
|
||||
val eventStream: EventStream,
|
||||
val scheduler: Scheduler,
|
||||
val classloader: ClassLoader) extends ActorRefProvider {
|
||||
val dynamicAccess: DynamicAccess) extends ActorRefProvider {
|
||||
|
||||
val remoteSettings = new RemoteSettings(settings.config, systemName)
|
||||
|
||||
val deployer = new RemoteDeployer(settings, classloader)
|
||||
val deployer = new RemoteDeployer(settings, dynamicAccess)
|
||||
|
||||
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer)
|
||||
|
||||
|
|
@ -84,7 +83,7 @@ class RemoteActorRefProvider(
|
|||
classOf[ActorSystemImpl] -> system,
|
||||
classOf[RemoteActorRefProvider] -> this)
|
||||
|
||||
ReflectiveAccess.createInstance[RemoteTransport](fqn, args, system.internalClassLoader) match {
|
||||
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match {
|
||||
case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
|
||||
case Right(remote) ⇒ remote
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ case class RemoteScope(node: Address) extends Scope {
|
|||
def withFallback(other: Scope): Scope = this
|
||||
}
|
||||
|
||||
class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) {
|
||||
class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) {
|
||||
|
||||
override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
|
|||
|
|
@ -219,7 +219,7 @@ class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) {
|
|||
|
||||
lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver)
|
||||
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, getClass.getClassLoader)
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage)
|
||||
|
||||
override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.serialization
|
||||
|
||||
import com.google.protobuf.Message
|
||||
import akka.actor.DynamicAccess
|
||||
|
||||
/**
|
||||
* This Serializer serializes `com.google.protobuf.Message`s
|
||||
|
|
@ -19,7 +20,7 @@ class ProtobufSerializer extends Serializer {
|
|||
case _ ⇒ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef =
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
|
||||
clazz match {
|
||||
case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
|
||||
case Some(c) ⇒ c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.testkit
|
||||
|
||||
import akka.actor._
|
||||
import akka.util.{ ReflectiveAccess, Duration }
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import scala.collection.immutable.Stack
|
||||
import akka.dispatch._
|
||||
|
|
@ -121,8 +121,7 @@ object TestActorRef {
|
|||
def apply[T <: Actor](implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
|
||||
|
||||
def apply[T <: Actor](name: String)(implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
createInstance[T](m.erasure, noParams, noArgs) match {
|
||||
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](m.erasure, Seq()) match {
|
||||
case Right(value) ⇒ value
|
||||
case Left(exception) ⇒ throw new ActorInitializationException(null,
|
||||
"Could not instantiate Actor" +
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue