diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 28b87bb5db..f8f22ec8c5 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -40,9 +40,9 @@ public class JavaExtension { static final ExtensionKey key = new ExtensionKey(OtherExtension.class) { }; - public final ActorSystemImpl system; + public final ExtendedActorSystem system; - public OtherExtension(ActorSystemImpl i) { + public OtherExtension(ExtendedActorSystem i) { system = i; } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index a2c3c7da5a..e8c667bc7e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -11,11 +11,10 @@ import akka.testkit._ import akka.util.Timeout import akka.util.duration._ import java.lang.IllegalStateException -import akka.util.ReflectiveAccess -import akka.serialization.Serialization import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.dispatch.{ Await, DefaultPromise, Promise, Future } import akka.pattern.ask +import akka.serialization.JavaSerializer object ActorRefSpec { @@ -240,6 +239,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { "be serializable using Java Serialization on local node" in { val a = system.actorOf(Props[InnerActor]) + val esys = system.asInstanceOf[ExtendedActorSystem] import java.io._ @@ -251,14 +251,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { out.flush out.close - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { - val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) + val bytes = baos.toByteArray + + JavaSerializer.currentSystem.withValue(esys) { + val in = new ObjectInputStream(new ByteArrayInputStream(bytes)) val readA = in.readObject a.isInstanceOf[LocalActorRef] must be === true readA.isInstanceOf[LocalActorRef] must be === true (readA eq a) must be === true } + + val ser = new JavaSerializer(esys) + val readA = ser.fromBinary(bytes, None) + readA.isInstanceOf[LocalActorRef] must be === true + (readA eq a) must be === true } "throw an exception on deserialize if no system in scope" in { @@ -297,7 +304,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { out.flush out.close - Serialization.currentSystem.withValue(sysImpl) { + JavaSerializer.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index b83fe78338..ee15e380c2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.util.Duration import akka.util.Timeout import akka.util.duration._ -import akka.serialization.Serialization import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } @@ -19,6 +18,7 @@ import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.pattern.ask +import akka.serialization.JavaSerializer object TypedActorSpec { @@ -367,7 +367,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to serialize and deserialize invocations" in { import java.io._ - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + JavaSerializer.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]()) val baos = new ByteArrayOutputStream(8192 * 4) val out = new ObjectOutputStream(baos) @@ -386,7 +386,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to serialize and deserialize invocations' parameters" in { import java.io._ val someFoo: Foo = new Bar - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + JavaSerializer.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) val baos = new ByteArrayOutputStream(8192 * 4) val out = new ObjectOutputStream(baos) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index df2170905e..4ddf774064 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -78,7 +78,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { case Left(exception) ⇒ fail(exception) case Right(bytes) ⇒ bytes } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Address], None) match { + deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match { case Left(exception) ⇒ fail(exception) case Right(add) ⇒ assert(add === addr) } @@ -90,7 +90,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { case Left(exception) ⇒ fail(exception) case Right(bytes) ⇒ bytes } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Person], None) match { + deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match { case Left(exception) ⇒ fail(exception) case Right(p) ⇒ assert(p === person) } @@ -103,7 +103,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { case Left(exception) ⇒ fail(exception) case Right(bytes) ⇒ bytes } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Record], None) match { + deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match { case Left(exception) ⇒ fail(exception) case Right(p) ⇒ assert(p === r) } @@ -135,7 +135,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { out.close() val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) - Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) { + JavaSerializer.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) { val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] (deadLetters eq a.deadLetters) must be(true) } @@ -248,8 +248,5 @@ class TestSerializer extends Serializer { Array.empty[Byte] } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, - classLoader: Option[ClassLoader] = None): AnyRef = { - null - } + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f86f64fa99..a854258453 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -220,7 +220,7 @@ private[akka] class ActorCell( val ser = SerializationExtension(system) ser.serialize(props.creator) match { case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass, None) match { + case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { case Left(t) ⇒ throw t case _ ⇒ //All good } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 38e8ab679f..90154a829b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -334,7 +334,7 @@ private[akka] class LocalActorRef private[akka] ( * Memento pattern for serializing ActorRefs transparently */ case class SerializedActorRef private (path: String) { - import akka.serialization.Serialization.currentSystem + import akka.serialization.JavaSerializer.currentSystem @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = currentSystem.value match { @@ -399,7 +399,7 @@ case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) private[akka] object DeadLetterActorRef { class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? @throws(classOf[java.io.ObjectStreamException]) - private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters + private def readResolve(): AnyRef = akka.serialization.JavaSerializer.currentSystem.value.deadLetters } val serialized = new SerializedDeadLetterActorRef diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 688c036380..98408dca65 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -318,12 +318,12 @@ class LocalActorRefProvider( settings: ActorSystem.Settings, eventStream: EventStream, scheduler: Scheduler, - classloader: ClassLoader) = + propertyMaster: PropertyMaster) = this(_systemName, settings, eventStream, scheduler, - new Deployer(settings, classloader)) + new Deployer(settings, propertyMaster)) val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b20cdf5f1c..05334f07d6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -324,13 +324,13 @@ abstract class ExtendedActorSystem extends ActorSystem { def deathWatch: DeathWatch /** - * ClassLoader which is used for reflective accesses internally. This is set - * to the context class loader, if one is set, or the class loader which + * ClassLoader wrapper which is used for reflective accesses internally. This is set + * to use the context class loader, if one is set, or the class loader which * loaded the ActorSystem implementation. The context class loader is also * set on all threads created by the ActorSystem, if one was set during * creation. */ - def internalClassLoader: ClassLoader + def propertyMaster: PropertyMaster } class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem { @@ -356,6 +356,34 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten final val threadFactory: MonitorableThreadFactory = MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader), uncaughtExceptionHandler) + /** + * This is an extension point: by overriding this method, subclasses can + * control all reflection activities of an actor system. + */ + protected def createPropertyMaster(): PropertyMaster = new DefaultPropertyMaster(findClassLoader) + + protected def findClassLoader: ClassLoader = + Option(Thread.currentThread.getContextClassLoader) orElse + (Reflect.getCallerClass map findCaller) getOrElse + getClass.getClassLoader + + private def findCaller(get: Int ⇒ Class[_]): ClassLoader = { + val frames = Iterator.from(2).map(get) + frames dropWhile { c ⇒ + c != null && + (c.getName.startsWith("akka.actor.ActorSystem") || + c.getName.startsWith("scala.Option") || + c.getName.startsWith("scala.collection.Iterator") || + c.getName.startsWith("akka.util.Reflect")) + } next () match { + case null ⇒ getClass.getClassLoader + case c ⇒ c.getClassLoader + } + } + + private val _pm: PropertyMaster = createPropertyMaster() + def propertyMaster: PropertyMaster = _pm + def logConfiguration(): Unit = log.info(settings.toString) protected def systemImpl = this @@ -406,17 +434,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten val scheduler: Scheduler = createScheduler() - val internalClassLoader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader - val provider: ActorRefProvider = { val arguments = Seq( classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, classOf[Scheduler] -> scheduler, - classOf[ClassLoader] -> internalClassLoader) + classOf[PropertyMaster] -> propertyMaster) - ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, internalClassLoader) match { + propertyMaster.getInstanceFor[ActorRefProvider](ProviderClass, arguments) match { case Left(e) ⇒ throw e case Right(p) ⇒ p } @@ -438,7 +464,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten def locker: Locker = provider.locker val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, deadLetterMailbox, scheduler, internalClassLoader)) + threadFactory, eventStream, deadLetterMailbox, scheduler, propertyMaster)) val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher @@ -557,8 +583,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten private def loadExtensions() { import scala.collection.JavaConversions._ settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ - import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs } - getObjectFor[AnyRef](fqcn, internalClassLoader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { + propertyMaster.getObjectFor[AnyRef](fqcn).fold(_ ⇒ propertyMaster.getInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match { case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); case Right(p: ExtensionId[_]) ⇒ registerExtension(p); case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index d561c74413..a26ce419d5 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -8,7 +8,6 @@ import akka.util.Duration import com.typesafe.config._ import akka.routing._ import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } -import akka.util.ReflectiveAccess /** * This class represents deployment configuration for a given actor path. It is @@ -83,7 +82,7 @@ case object NoScopeGiven extends Scope { * * @author Jonas Bonér */ -class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) { +class Deployer(val settings: ActorSystem.Settings, val propertyMaster: PropertyMaster) { import scala.collection.JavaConverters._ @@ -125,7 +124,7 @@ class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) - ReflectiveAccess.createInstance[RouterConfig](fqn, args, classloader) match { + propertyMaster.getInstanceFor[RouterConfig](fqn, args) match { case Right(router) ⇒ router case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index 547257e850..0b0088c78c 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -3,8 +3,6 @@ */ package akka.actor -import akka.util.ReflectiveAccess - /** * The basic ActorSystem covers all that is needed for locally running actors, * using futures and so on. In addition, more features can hook into it and @@ -73,12 +71,12 @@ trait ExtensionIdProvider { /** * This is a one-stop-shop if all you want is an extension which is - * constructed with the ActorSystemImpl as its only constructor argument: + * constructed with the ExtendedActorSystem as its only constructor argument: * * {{{ * object MyExt extends ExtensionKey[Ext] * - * class Ext(system: ActorSystemImpl) extends MyExt { + * class Ext(system: ExtendedActorSystem) extends MyExt { * ... * } * }}} @@ -89,7 +87,7 @@ trait ExtensionIdProvider { * public class MyExt extends Extension { * static final ExtensionKey key = new ExtensionKey(MyExt.class); * - * public MyExt(ActorSystemImpl system) { + * public MyExt(ExtendedActorSystem system) { * ... * } * }}} @@ -99,7 +97,7 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassManifest[T]) extend override def lookup(): ExtensionId[T] = this def createExtension(system: ExtendedActorSystem): T = - ReflectiveAccess.createInstance[T](m.erasure, Array[Class[_]](classOf[ActorSystemImpl]), Array[AnyRef](system)) match { + PropertyMaster.getInstanceFor[T](m.erasure, Seq(classOf[ExtendedActorSystem] -> system)) match { case Left(ex) ⇒ throw ex case Right(r) ⇒ r } diff --git a/akka-actor/src/main/scala/akka/actor/PropertyMaster.scala b/akka-actor/src/main/scala/akka/actor/PropertyMaster.scala new file mode 100644 index 0000000000..3f3a493b8f --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/PropertyMaster.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor + +import akka.util.NonFatal +import java.lang.reflect.InvocationTargetException + +/** + * The property master is responsible for acquiring all props needed for a + * performance; in Akka this is the class which is used for reflectively + * loading all configurable parts of an actor system. + */ +trait PropertyMaster { + + def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]] + + def getInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] + + def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T] + + /** + * This is needed e.g. by the JavaSerializer to build the ObjectInputStream. + */ + def classLoader: ClassLoader + +} + +object PropertyMaster { + + def getInstanceFor[T: ClassManifest](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = { + val types = args.map(_._1).toArray + val values = args.map(_._2).toArray + withErrorHandling { + val constructor = clazz.getDeclaredConstructor(types: _*) + constructor.setAccessible(true) + val obj = constructor.newInstance(values: _*).asInstanceOf[T] + val t = classManifest[T].erasure + if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t)) + } + } + + /** + * Caught exception is returned as Left(exception). + * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. + * Other `Throwable`, such as `Error` is thrown. + */ + @inline + final def withErrorHandling[T](body: ⇒ Either[Throwable, T]): Either[Throwable, T] = + try body catch { + case e: InvocationTargetException ⇒ + e.getTargetException match { + case NonFatal(t) ⇒ Left(t) + case t ⇒ throw t + } + case NonFatal(e) ⇒ Left(e) + } + +} + +class DefaultPropertyMaster(val classLoader: ClassLoader) extends PropertyMaster { + + import PropertyMaster.withErrorHandling + + override def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]] = + try { + val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]] + val t = classManifest[T].erasure + if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c)) + } catch { + case NonFatal(e) ⇒ Left(e) + } + + override def getInstanceFor[T: ClassManifest](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = + getClassFor(fqcn).fold(Left(_), { c ⇒ + val types = args.map(_._1).toArray + val values = args.map(_._2).toArray + withErrorHandling { + val constructor = c.getDeclaredConstructor(types: _*) + constructor.setAccessible(true) + val obj = constructor.newInstance(values: _*) + val t = classManifest[T].erasure + if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t)) + } + }) + + override def getObjectFor[T: ClassManifest](fqcn: String): Either[Throwable, T] = { + getClassFor(fqcn).fold(Left(_), { c ⇒ + withErrorHandling { + val module = c.getDeclaredField("MODULE$") + module.setAccessible(true) + val t = classManifest[T].erasure + module.get(null) match { + case null ⇒ Left(new NullPointerException) + case x if !t.isInstance(x) ⇒ Left(new ClassCastException(fqcn + " is not a subtype of " + t)) + case x ⇒ Right(x.asInstanceOf[T]) + } + } + }) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 41eba86bbd..cc1e02bb3e 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -129,7 +129,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length) for (i ← 0 until ps.length) { val p = ps(i) - val s = SerializationExtension(Serialization.currentSystem.value).findSerializerFor(p) + val system = akka.serialization.JavaSerializer.currentSystem.value + val s = SerializationExtension(system).findSerializerFor(p) val m = if (s.includeManifest) p.getClass else null serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity } @@ -146,7 +147,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi //TODO implement writeObject and readObject to serialize //TODO Possible optimization is to special encode the parameter-types to conserve space private def readResolve(): AnyRef = { - val system = akka.serialization.Serialization.currentSystem.value + val system = akka.serialization.JavaSerializer.currentSystem.value if (system eq null) throw new IllegalStateException( "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") @@ -158,7 +159,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity for (i ← 0 until a.length) { val (sId, manifest, bytes) = a(i) - deserializedParameters(i) = serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest)) + deserializedParameters(i) = + serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest)) } deserializedParameters diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8a5bcfa385..db7774eeb7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -12,7 +12,6 @@ import akka.actor.ActorSystem import scala.annotation.tailrec import akka.event.EventStream import com.typesafe.config.Config -import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension import akka.jsr166y.ForkJoinPool import akka.util.NonFatal @@ -26,7 +25,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS val ser = SerializationExtension(system) ser.serialize(msg) match { //Verify serializability case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability + case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass) match { //Verify deserializability case Left(t) ⇒ throw t case _ ⇒ //All good } @@ -369,7 +368,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit } case fqcn ⇒ val args = Seq(classOf[Config] -> config) - ReflectiveAccess.createInstance[MailboxType](fqcn, args, prerequisites.classloader) match { + prerequisites.propertyMaster.getInstanceFor[MailboxType](fqcn, args) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( @@ -385,8 +384,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case fqcn ⇒ - val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites]) - ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites), prerequisites.classloader) match { + val args = Seq( + classOf[Config] -> config, + classOf[DispatcherPrerequisites] -> prerequisites) + prerequisites.propertyMaster.getInstanceFor[ExecutorServiceConfigurator](fqcn, args) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index d71604fd1a..8529c8f733 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -4,22 +4,24 @@ package akka.dispatch -import akka.actor.newUuid -import akka.util.{ Duration, ReflectiveAccess } -import akka.actor.ActorSystem -import akka.event.EventStream -import akka.actor.Scheduler -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory +import java.util.concurrent.{ ConcurrentHashMap, TimeUnit, ThreadFactory } + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import com.typesafe.config.{ ConfigFactory, Config } + +import Dispatchers.DefaultDispatcherId +import akka.actor.{ Scheduler, PropertyMaster, ActorSystem } import akka.event.Logging.Warning -import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap } +import akka.event.EventStream +import akka.util.Duration trait DispatcherPrerequisites { def threadFactory: ThreadFactory def eventStream: EventStream def deadLetterMailbox: Mailbox def scheduler: Scheduler - def classloader: ClassLoader + def propertyMaster: PropertyMaster } case class DefaultDispatcherPrerequisites( @@ -27,7 +29,7 @@ case class DefaultDispatcherPrerequisites( val eventStream: EventStream, val deadLetterMailbox: Mailbox, val scheduler: Scheduler, - val classloader: ClassLoader) extends DispatcherPrerequisites + val propertyMaster: PropertyMaster) extends DispatcherPrerequisites object Dispatchers { /** @@ -137,7 +139,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, args, prerequisites.classloader) match { + prerequisites.propertyMaster.getInstanceFor[MessageDispatcherConfigurator](fqn, args) match { case Right(configurator) ⇒ configurator case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index a6622b7875..c8dcedc3d5 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -6,7 +6,6 @@ package akka.event import akka.actor._ import akka.AkkaException import akka.actor.ActorSystem.Settings -import akka.util.ReflectiveAccess import akka.config.ConfigurationException import akka.util.ReentrantGuard import akka.util.duration._ @@ -101,7 +100,7 @@ trait LoggingBus extends ActorEventBus { if loggerName != StandardOutLoggerName } yield { try { - ReflectiveAccess.getClassFor[Actor](loggerName, system.internalClassLoader) match { + system.propertyMaster.getClassFor[Actor](loggerName) match { case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) case Left(exception) ⇒ throw exception } @@ -350,7 +349,7 @@ object Logging { object Extension extends ExtensionKey[LogExt] - class LogExt(system: ActorSystemImpl) extends Extension { + class LogExt(system: ExtendedActorSystem) extends Extension { private val loggerId = new AtomicInteger def id() = loggerId.incrementAndGet() } diff --git a/akka-actor/src/main/scala/akka/serialization/Format.scala b/akka-actor/src/main/scala/akka/serialization/Format.scala index 41a8eed658..7ce9d8a2e6 100644 --- a/akka-actor/src/main/scala/akka/serialization/Format.scala +++ b/akka-actor/src/main/scala/akka/serialization/Format.scala @@ -61,6 +61,7 @@ trait ToBinary[T <: Actor] { * Type class definition for Actor Serialization. * Client needs to implement Format[] for the respective actor. */ +// FIXME RK: should this go? It’s not used anywhere, looks like cluster residue. trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] /** @@ -97,7 +98,7 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable { val serializer: Serializer - def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T] + def fromBinary(bytes: Array[Byte], act: T): T = serializer.fromBinary(bytes, Some(act.getClass)).asInstanceOf[T] - def toBinary(ac: T) = serializer.toBinary(ac) + def toBinary(ac: T): Array[Byte] = serializer.toBinary(ac) } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index e89adde8fb..1751f49082 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -5,30 +5,16 @@ package akka.serialization import akka.AkkaException -import akka.util.ReflectiveAccess import scala.util.DynamicVariable import com.typesafe.config.Config import akka.config.ConfigurationException -import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } +import akka.actor.{ Extension, ExtendedActorSystem, Address } import java.util.concurrent.ConcurrentHashMap import akka.event.Logging case class NoSerializerFoundException(m: String) extends AkkaException(m) object Serialization { - /** - * This holds a reference to the current ActorSystem (the surrounding context) - * during serialization and deserialization. - * - * If you are using Serializers yourself, outside of SerializationExtension, - * you'll need to surround the serialization/deserialization with: - * - * currentSystem.withValue(system) { - * ...code... - * } - */ - val currentSystem = new DynamicVariable[ActorSystem](null) - /** * This holds a reference to the current transport address to be inserted * into local actor refs during serialization. @@ -74,7 +60,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * to either an Array of Bytes or an Exception if one was thrown. */ def serialize(o: AnyRef): Either[Exception, Array[Byte]] = - try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception ⇒ Left(e) } + try Right(findSerializerFor(o).toBinary(o)) + catch { case e: Exception ⇒ Left(e) } /** * Deserializes the given array of bytes using the specified serializer id, @@ -83,26 +70,18 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { */ def deserialize(bytes: Array[Byte], serializerId: Int, - clazz: Option[Class[_]], - classLoader: ClassLoader): Either[Exception, AnyRef] = - try { - currentSystem.withValue(system) { - Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader))) - } - } catch { case e: Exception ⇒ Left(e) } + clazz: Option[Class[_]]): Either[Exception, AnyRef] = + try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) + catch { case e: Exception ⇒ Left(e) } /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * You can specify an optional ClassLoader to load the object into. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize( - bytes: Array[Byte], - clazz: Class[_], - classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = - try { - currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } - } catch { case e: Exception ⇒ Left(e) } + def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Exception, AnyRef] = + try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz))) + catch { case e: Exception ⇒ Left(e) } /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null, @@ -149,8 +128,11 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { /** * Tries to load the specified Serializer by the FQN */ - def serializerOf(serializerFQN: String): Either[Exception, Serializer] = - ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) + def serializerOf(serializerFQN: String): Either[Throwable, Serializer] = { + val pm = system.propertyMaster + pm.getInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) + .fold(_ ⇒ pm.getInstanceFor[Serializer](serializerFQN, Seq()), Right(_)) + } /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index ffe7f50de9..0acd6e4f8c 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -6,11 +6,29 @@ package akka.serialization import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } import akka.util.ClassLoaderObjectInputStream +import akka.actor.PropertyMaster +import akka.actor.ExtendedActorSystem +import scala.util.DynamicVariable /** - * A Serializer represents a bimap between an object and an array of bytes representing that object + * A Serializer represents a bimap between an object and an array of bytes representing that object. + * + * Serializers are loaded using reflection during [[akka.actor.ActorSystem]] + * start-up, where two constructors are tried in order: + * + *
    + *
  • taking exactly one argument of type [[akka.actor.ExtendedActorSystem]]; + * this should be the preferred one because all reflective loading of classes + * during deserialization should use ExtendedActorSystem.propertyMaster (see + * [[akka.actor.PropertyMaster]]), and
  • + *
  • without arguments, which is only an option if the serializer does not + * load classes using reflection.
  • + *
+ * + * Be sure to always use the PropertyManager for loading classes! */ trait Serializer extends scala.Serializable { + /** * Completely unique value to identify this implementation of Serializer, used to optimize network traffic * Values from 0 to 16 is reserved for Akka internal usage @@ -28,42 +46,52 @@ trait Serializer extends scala.Serializable { def includeManifest: Boolean /** - * Deserializes the given Array of Bytes into an AnyRef + * Produces an object from an array of bytes, with an optional type-hint; + * the class should be loaded using ActorSystem.propertyMaster. */ - def fromBinary(bytes: Array[Byte]): AnyRef = fromBinary(bytes, None, None) - - /** - * Deserializes the given Array of Bytes into an AnyRef with an optional type hint - */ - def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = fromBinary(bytes, manifest, None) - - /** - * Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into - */ - def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]], classLoader: Option[ClassLoader]): AnyRef + def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef } /** - * Java API for creating a Serializer + * Java API for creating a Serializer: make sure to include a constructor which + * takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because + * that is the preferred constructor which will be invoked when reflectively instantiating + * the JSerializer (also possible with empty constructor). */ abstract class JSerializer extends Serializer { - def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = - fromBinary(bytes, manifest.orNull, classLoader.orNull) + def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = + fromBinary(bytes, manifest.orNull) /** * This method should be overridden, * manifest and classLoader may be null. */ - def fromBinary(bytes: Array[Byte], manifest: Class[_], classLoader: ClassLoader): AnyRef + def fromBinary(bytes: Array[Byte], manifest: Class[_]): AnyRef } -object JavaSerializer extends JavaSerializer object NullSerializer extends NullSerializer +object JavaSerializer { + + /** + * This holds a reference to the current ActorSystem (the surrounding context) + * during serialization and deserialization. + * + * If you are using Serializers yourself, outside of SerializationExtension, + * you'll need to surround the serialization/deserialization with: + * + * currentSystem.withValue(system) { + * ...code... + * } + */ + val currentSystem = new DynamicVariable[ExtendedActorSystem](null) + +} + /** * This Serializer uses standard Java Serialization */ -class JavaSerializer extends Serializer { +class JavaSerializer(val system: ExtendedActorSystem) extends Serializer { def includeManifest: Boolean = false @@ -77,12 +105,11 @@ class JavaSerializer extends Serializer { bos.toByteArray } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, - classLoader: Option[ClassLoader] = None): AnyRef = { - val in = - if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else - new ObjectInputStream(new ByteArrayInputStream(bytes)) - val obj = in.readObject + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { + val in = new ClassLoaderObjectInputStream(system.propertyMaster.classLoader, new ByteArrayInputStream(bytes)) + val obj = JavaSerializer.currentSystem.withValue(system) { + in.readObject + } in.close() obj } @@ -96,5 +123,5 @@ class NullSerializer extends Serializer { def includeManifest: Boolean = false def identifier = 0 def toBinary(o: AnyRef) = nullAsBytes - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null } diff --git a/akka-actor/src/main/scala/akka/util/Reflect.scala b/akka-actor/src/main/scala/akka/util/Reflect.scala new file mode 100644 index 0000000000..d2ec6ddfbd --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/Reflect.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +object Reflect { + + val getCallerClass: Option[Int ⇒ Class[_]] = { + try { + val c = Class.forName("sun.reflect.Reflection"); + val m = c.getMethod("getCallerClass", Array(classOf[Int]): _*) + Some((i: Int) ⇒ m.invoke(null, Array[AnyRef](i.asInstanceOf[Integer]): _*).asInstanceOf[Class[_]]) + } catch { + case NonFatal(e) ⇒ None + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala deleted file mode 100644 index 19ad3bc995..0000000000 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.util - -import java.lang.reflect.InvocationTargetException - -object ReflectiveAccess { - - val loader = getClass.getClassLoader - val noParams: Array[Class[_]] = Array() - val noArgs: Array[AnyRef] = Array() - - def createInstance[T](clazz: Class[_], - params: Array[Class[_]], - args: Array[AnyRef]): Either[Exception, T] = withErrorHandling { - assert(clazz ne null) - assert(params ne null) - assert(args ne null) - val ctor = clazz.getDeclaredConstructor(params: _*) - ctor.setAccessible(true) - Right(ctor.newInstance(args: _*).asInstanceOf[T]) - } - - def createInstance[T](fqn: String, - params: Array[Class[_]], - args: Array[AnyRef], - classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling { - assert(params ne null) - assert(args ne null) - getClassFor(fqn, classloader) match { - case Right(value) ⇒ - val ctor = value.getDeclaredConstructor(params: _*) - ctor.setAccessible(true) - Right(ctor.newInstance(args: _*).asInstanceOf[T]) - case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly - } - } - - def createInstance[T](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Exception, T] = - createInstance(clazz, args.map(_._1).toArray, args.map(_._2).toArray) - - def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)], classloader: ClassLoader): Either[Exception, T] = - createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, classloader) - - def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Exception, T] = - createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, loader) - - //Obtains a reference to fqn.MODULE$ - def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try { - getClassFor(fqn, classloader) match { - case Right(value) ⇒ - val instance = value.getDeclaredField("MODULE$") - instance.setAccessible(true) - val obj = instance.get(null) - if (obj eq null) Left(new NullPointerException) else Right(obj.asInstanceOf[T]) - case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly - } - } catch { - case e: Exception ⇒ - Left(e) - } - - def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try { - assert(fqn ne null) - - // First, use the specified CL - val first = try { - Right(classloader.loadClass(fqn).asInstanceOf[Class[T]]) - } catch { - case c: ClassNotFoundException ⇒ Left(c) - } - - if (first.isRight) first - else { - // Second option is to use the ContextClassLoader - val second = try { - Right(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) - } catch { - case c: ClassNotFoundException ⇒ Left(c) - } - - if (second.isRight) second - else { - val third = try { - if (classloader ne loader) Right(loader.loadClass(fqn).asInstanceOf[Class[T]]) else Left(null) //Horrid - } catch { - case c: ClassNotFoundException ⇒ Left(c) - } - - if (third.isRight) third - else { - try { - Right(Class.forName(fqn).asInstanceOf[Class[T]]) // Last option is Class.forName - } catch { - case c: ClassNotFoundException ⇒ Left(c) - } - } - } - } - } catch { - case e: Exception ⇒ Left(e) - } - - /** - * Caught exception is returned as Left(exception). - * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. - * Other `Throwable`, such as `Error` is thrown. - */ - @inline - private final def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = { - try { - body - } catch { - case e: InvocationTargetException ⇒ e.getTargetException match { - case t: Exception ⇒ Left(t) - case t ⇒ throw t - } - case e: Exception ⇒ - Left(e) - } - } - -} - diff --git a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java index b68f8f2e79..22a9f571d2 100644 --- a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java @@ -3,6 +3,7 @@ */ package akka.docs.serialization; +import akka.japi.Option; import akka.serialization.JSerializer; import akka.serialization.Serialization; import akka.serialization.SerializationExtension; @@ -13,6 +14,7 @@ import static org.junit.Assert.*; import akka.serialization.*; import akka.actor.ActorSystem; +import akka.actor.PropertyMaster; import com.typesafe.config.*; //#imports @@ -45,8 +47,7 @@ public class SerializationDocTestBase { // using the type hint (if any, see "includeManifest" above) // into the optionally provided classLoader. @Override public Object fromBinary(byte[] bytes, - Class clazz, - ClassLoader classLoader) { + Class clazz) { // Put your code that deserializes here //#... return null; @@ -128,7 +129,7 @@ public class SerializationDocTestBase { // Turn it back into an object, // the nulls are for the class manifest and for the classloader - String back = (String)serializer.fromBinary(bytes); + String back = (String)serializer.fromBinary(bytes, Option.>none().asScala()); // Voilá! assertEquals(original, back); diff --git a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala index 7f1553f75c..ae87e0d5ab 100644 --- a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala @@ -6,7 +6,7 @@ package akka.docs.serialization import org.scalatest.matchers.MustMatchers import akka.testkit._ //#imports -import akka.actor.ActorSystem +import akka.actor.{ ActorSystem, PropertyMaster } import akka.serialization._ import com.typesafe.config.ConfigFactory @@ -35,8 +35,7 @@ class MyOwnSerializer extends Serializer { // using the type hint (if any, see "includeManifest" above) // into the optionally provided classLoader. def fromBinary(bytes: Array[Byte], - clazz: Option[Class[_]], - classLoader: Option[ClassLoader] = None): AnyRef = { + clazz: Option[Class[_]]): AnyRef = { // Put your code that deserializes here //#... null @@ -147,9 +146,7 @@ class SerializationDocSpec extends AkkaSpec { val bytes = serializer.toBinary(original) // Turn it back into an object - val back = serializer.fromBinary(bytes, - manifest = None, - classLoader = None) + val back = serializer.fromBinary(bytes, manifest = None) // Voilá! back must equal(original) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index cd2ca2d0f0..489d97d176 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -22,7 +22,7 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { +class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 8da17a4cc9..ccdbdc4145 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -17,7 +17,7 @@ class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: ActorContext) = new FileBasedMailbox(owner) } -class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { +class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMailbox") diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 5e319dafac..69f7fb50c1 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } -abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { +abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] @@ -22,15 +22,13 @@ abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) } -trait DurableMessageSerialization { - - def owner: ActorContext +trait DurableMessageSerialization { this: DurableMailbox ⇒ def serialize(durableMessage: Envelope): Array[Byte] = { def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build - val message = MessageSerializer.serialize(owner.system, durableMessage.message.asInstanceOf[AnyRef]) + val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef]) val builder = RemoteMessageProtocol.newBuilder .setMessage(message) .setRecipient(serializeActorRef(owner.self)) @@ -41,13 +39,13 @@ trait DurableMessageSerialization { def deserialize(bytes: Array[Byte]): Envelope = { - def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath) + def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = system.actorFor(refProtocol.getPath) val durableMessage = RemoteMessageProtocol.parseFrom(bytes) - val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage, getClass.getClassLoader) + val message = MessageSerializer.deserialize(system, durableMessage.getMessage) val sender = deserializeActorRef(durableMessage.getSender) - new Envelope(message, sender)(owner.system) + new Envelope(message, sender)(system) } } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index cc36286e36..5aa314eb55 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -65,7 +65,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument] system.log.debug("Deserializing a durable message from MongoDB: {}", doc) val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData) - val msg = MessageSerializer.deserialize(system, msgData, system.internalClassLoader) + val msg = MessageSerializer.deserialize(system, msgData) val ownerPath = doc.as[String]("ownerPath") val senderPath = doc.as[String]("senderPath") val sender = system.actorFor(senderPath) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index fed643c7d1..d17a1221a8 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -32,7 +32,7 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) { +class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 5d4e09e65e..b6cf3febc6 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -19,7 +19,7 @@ class RedisBasedMailboxType(config: Config) extends MailboxType { override def create(owner: ActorContext) = new RedisBasedMailbox(owner) } -class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { +class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 81e4c378eb..90fd381af1 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -20,7 +20,7 @@ class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner) } -class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { +class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 5301f2bdd0..ddb97d02c4 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -6,32 +6,25 @@ package akka.remote import akka.remote.RemoteProtocol._ import com.google.protobuf.ByteString -import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension -import akka.util.ReflectiveAccess object MessageSerializer { - def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: ClassLoader): AnyRef = { - val clazz = if (messageProtocol.hasMessageManifest) { - Option(ReflectiveAccess.getClassFor[AnyRef]( - messageProtocol.getMessageManifest.toStringUtf8, - classLoader) match { - case Left(e) ⇒ throw e - case Right(r) ⇒ r - }) - } else None - SerializationExtension(system).deserialize( - messageProtocol.getMessage.toByteArray, - messageProtocol.getSerializerId, - clazz, - classLoader) match { + def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = { + val clazz = + if (messageProtocol.hasMessageManifest) { + system.propertyMaster.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8) + .fold(throw _, Some(_)) + } else None + SerializationExtension(system) + .deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match { case Left(e) ⇒ throw e case Right(r) ⇒ r } } - def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = { + def serialize(system: ExtendedActorSystem, message: AnyRef): MessageProtocol = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = MessageProtocol.newBuilder diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 872be5aa41..e62975ea9f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -12,7 +12,6 @@ import akka.event.EventStream import akka.config.ConfigurationException import java.util.concurrent.{ TimeoutException } import com.typesafe.config.Config -import akka.util.ReflectiveAccess import akka.serialization.Serialization import akka.serialization.SerializationExtension @@ -28,11 +27,11 @@ class RemoteActorRefProvider( val settings: ActorSystem.Settings, val eventStream: EventStream, val scheduler: Scheduler, - val classloader: ClassLoader) extends ActorRefProvider { + val propertyMaster: PropertyMaster) extends ActorRefProvider { val remoteSettings = new RemoteSettings(settings.config, systemName) - val deployer = new RemoteDeployer(settings, classloader) + val deployer = new RemoteDeployer(settings, propertyMaster) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) @@ -84,7 +83,7 @@ class RemoteActorRefProvider( classOf[ActorSystemImpl] -> system, classOf[RemoteActorRefProvider] -> this) - ReflectiveAccess.createInstance[RemoteTransport](fqn, args, system.internalClassLoader) match { + system.propertyMaster.getInstanceFor[RemoteTransport](fqn, args) match { case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) case Right(remote) ⇒ remote } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 9686d295ad..ec6ab165a3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -12,7 +12,7 @@ case class RemoteScope(node: Address) extends Scope { def withFallback(other: Scope): Scope = this } -class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) { +class RemoteDeployer(_settings: ActorSystem.Settings, _pm: PropertyMaster) extends Deployer(_settings, _pm) { override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index b40992d12a..256451bc0a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -219,7 +219,7 @@ class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) { lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, getClass.getClassLoader) + lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage) override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender } diff --git a/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala index f733e8188f..c970a4b733 100644 --- a/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala @@ -5,6 +5,7 @@ package akka.serialization import com.google.protobuf.Message +import akka.actor.PropertyMaster /** * This Serializer serializes `com.google.protobuf.Message`s @@ -19,7 +20,7 @@ class ProtobufSerializer extends Serializer { case _ ⇒ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]") } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf") case Some(c) ⇒ c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message] diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index d42cfcf165..21b1221b45 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -5,7 +5,7 @@ package akka.testkit import akka.actor._ -import akka.util.{ ReflectiveAccess, Duration } +import akka.util.Duration import java.util.concurrent.atomic.AtomicLong import scala.collection.immutable.Stack import akka.dispatch._ @@ -121,8 +121,7 @@ object TestActorRef { def apply[T <: Actor](implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](name: String)(implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({ - import ReflectiveAccess.{ createInstance, noParams, noArgs } - createInstance[T](m.erasure, noParams, noArgs) match { + PropertyMaster.getInstanceFor[T](m.erasure, Seq()) match { case Right(value) ⇒ value case Left(exception) ⇒ throw new ActorInitializationException(null, "Could not instantiate Actor" +