#2469 - Switching to scala.util.Try instead of Either[Throwable, T] in the codebase
This commit is contained in:
parent
6e4b0dc3de
commit
4eee04cb60
21 changed files with 123 additions and 249 deletions
|
|
@ -260,7 +260,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
} yield b + "-" + c
|
||||
|
||||
Await.result(future1, timeout.duration) must be("10-14")
|
||||
assert(checkType(future1, manifest[String]))
|
||||
assert(checkType(future1, scala.reflect.classTag[String]))
|
||||
intercept[ClassCastException] { Await.result(future2, timeout.duration) }
|
||||
system.stop(actor)
|
||||
}
|
||||
|
|
@ -479,7 +479,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}
|
||||
}))
|
||||
|
||||
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
|
||||
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo scala.reflect.classTag[Int])
|
||||
|
||||
assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000)
|
||||
system.stop(oddActor)
|
||||
|
|
@ -939,9 +939,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
f((future, message) ⇒ {
|
||||
future.value must be('defined)
|
||||
future.value.get must be('failure)
|
||||
future.value.get match {
|
||||
case Failure(f) ⇒ f.getMessage must be(message)
|
||||
}
|
||||
val Failure(f) = future.value.get
|
||||
f.getMessage must be(message)
|
||||
})
|
||||
}
|
||||
"throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) }
|
||||
|
|
|
|||
|
|
@ -87,50 +87,23 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
|
|||
}
|
||||
|
||||
"serialize Address" in {
|
||||
val b = serialize(addr) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(add) ⇒ assert(add === addr)
|
||||
}
|
||||
assert(deserialize(serialize(addr).get, classOf[Address]).get === addr)
|
||||
}
|
||||
|
||||
"serialize Person" in {
|
||||
|
||||
val b = serialize(person) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(p) ⇒ assert(p === person)
|
||||
}
|
||||
assert(deserialize(serialize(person).get, classOf[Person]).get === person)
|
||||
}
|
||||
|
||||
"serialize record with default serializer" in {
|
||||
|
||||
val r = Record(100, person)
|
||||
val b = serialize(r) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(p) ⇒ assert(p === r)
|
||||
}
|
||||
assert(deserialize(serialize(r).get, classOf[Record]).get === r)
|
||||
}
|
||||
|
||||
"not serialize ActorCell" in {
|
||||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case o: ObjectOutputStream ⇒
|
||||
try {
|
||||
o.writeObject(this)
|
||||
} catch {
|
||||
case _: NotSerializableException ⇒ testActor ! "pass"
|
||||
}
|
||||
try o.writeObject(this) catch { case _: NotSerializableException ⇒ testActor ! "pass" }
|
||||
}
|
||||
}))
|
||||
a ! new ObjectOutputStream(new ByteArrayOutputStream())
|
||||
|
|
|
|||
|
|
@ -413,7 +413,7 @@ class LocalActorRefProvider(
|
|||
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
||||
|
||||
private def guardianSupervisorStrategyConfigurator =
|
||||
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x)
|
||||
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, R
|
|||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.actor.cell.ChildrenContainer
|
||||
import scala.concurrent.util.FiniteDuration
|
||||
import util.{ Failure, Success }
|
||||
|
||||
object ActorSystem {
|
||||
|
||||
|
|
@ -540,10 +541,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
classOf[Scheduler] -> scheduler,
|
||||
classOf[DynamicAccess] -> dynamicAccess)
|
||||
|
||||
dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(p) ⇒ p
|
||||
}
|
||||
dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get
|
||||
}
|
||||
|
||||
def deadLetters: ActorRef = provider.deadLetters
|
||||
|
|
@ -678,13 +676,12 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
private def loadExtensions() {
|
||||
import scala.collection.JavaConversions._
|
||||
settings.config.getStringList("akka.extensions") foreach { fqcn ⇒
|
||||
dynamicAccess.getObjectFor[AnyRef](fqcn).fold(_ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match {
|
||||
case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup());
|
||||
case Right(p: ExtensionId[_]) ⇒ registerExtension(p);
|
||||
case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
|
||||
case Left(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
|
||||
dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match {
|
||||
case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup())
|
||||
case Success(p: ExtensionId[_]) ⇒ registerExtension(p)
|
||||
case Success(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
|
||||
case Failure(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -156,15 +156,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
|
|||
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
||||
case fqn ⇒
|
||||
val args = Seq(classOf[Config] -> deployment)
|
||||
dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match {
|
||||
case Right(router) ⇒ router
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
("Cannot instantiate router [%s], defined in [%s], " +
|
||||
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
|
||||
"[com.typesafe.config.Config] parameter")
|
||||
.format(fqn, key), exception)
|
||||
}
|
||||
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({
|
||||
case exception ⇒ throw new IllegalArgumentException(
|
||||
("Cannot instantiate router [%s], defined in [%s], " +
|
||||
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
|
||||
"[com.typesafe.config.Config] parameter")
|
||||
.format(fqn, key), exception)
|
||||
}).get
|
||||
}
|
||||
|
||||
Some(Deploy(key, deployment, router, NoScopeGiven))
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
* CopySuccess (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* The DynamicAccess implementation is the class which is used for
|
||||
|
|
@ -16,7 +17,6 @@ import scala.reflect.ClassTag
|
|||
* unless they are extending Akka in ways which go beyond simple Extensions.
|
||||
*/
|
||||
abstract class DynamicAccess {
|
||||
|
||||
/**
|
||||
* Convenience method which given a `Class[_]` object and a constructor description
|
||||
* will create a new instance of that class.
|
||||
|
|
@ -25,23 +25,13 @@ abstract class DynamicAccess {
|
|||
* val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name))
|
||||
* }}}
|
||||
*/
|
||||
def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = {
|
||||
val types = args.map(_._1).toArray
|
||||
val values = args.map(_._2).toArray
|
||||
withErrorHandling {
|
||||
val constructor = clazz.getDeclaredConstructor(types: _*)
|
||||
constructor.setAccessible(true)
|
||||
val obj = constructor.newInstance(values: _*).asInstanceOf[T]
|
||||
val t = implicitly[ClassTag[T]].runtimeClass
|
||||
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t))
|
||||
}
|
||||
}
|
||||
def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T]
|
||||
|
||||
/**
|
||||
* Obtain a `Class[_]` object loaded with the right class loader (i.e. the one
|
||||
* Obtain a `Class[_]` object loaded with the Success class loader (i.e. the one
|
||||
* returned by `classLoader`).
|
||||
*/
|
||||
def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]]
|
||||
def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]]
|
||||
|
||||
/**
|
||||
* Obtain an object conforming to the type T, which is expected to be
|
||||
|
|
@ -50,35 +40,18 @@ abstract class DynamicAccess {
|
|||
* `args` argument. The exact usage of args depends on which type is requested,
|
||||
* see the relevant requesting code for details.
|
||||
*/
|
||||
def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T]
|
||||
def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T]
|
||||
|
||||
/**
|
||||
* Obtain the Scala “object” instance for the given fully-qualified class name, if there is one.
|
||||
*/
|
||||
def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T]
|
||||
def getObjectFor[T: ClassTag](fqcn: String): Try[T]
|
||||
|
||||
/**
|
||||
* This is the class loader to be used in those special cases where the
|
||||
* other factory method are not applicable (e.g. when constructing a ClassLoaderBinaryInputStream).
|
||||
*/
|
||||
def classLoader: ClassLoader
|
||||
|
||||
/**
|
||||
* Caught exception is returned as Left(exception).
|
||||
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
|
||||
* Other `Throwable`, such as `Error` is thrown.
|
||||
*/
|
||||
@inline
|
||||
final def withErrorHandling[T](body: ⇒ Either[Throwable, T]): Either[Throwable, T] =
|
||||
try body catch {
|
||||
case e: InvocationTargetException ⇒
|
||||
e.getTargetException match {
|
||||
case NonFatal(t) ⇒ Left(t)
|
||||
case t ⇒ throw t
|
||||
}
|
||||
case NonFatal(e) ⇒ Left(e)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -89,42 +62,41 @@ abstract class DynamicAccess {
|
|||
* by default.
|
||||
*/
|
||||
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
|
||||
//FIXME switch to Scala Reflection for 2.10
|
||||
override def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] =
|
||||
try {
|
||||
|
||||
override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
|
||||
Try[Class[_ <: T]]({
|
||||
val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]]
|
||||
val t = implicitly[ClassTag[T]].runtimeClass
|
||||
if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c))
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ Left(e)
|
||||
}
|
||||
|
||||
override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] =
|
||||
getClassFor(fqcn).fold(Left(_), { c ⇒
|
||||
val types = args.map(_._1).toArray
|
||||
val values = args.map(_._2).toArray
|
||||
withErrorHandling {
|
||||
val constructor = c.getDeclaredConstructor(types: _*)
|
||||
constructor.setAccessible(true)
|
||||
val obj = constructor.newInstance(values: _*)
|
||||
val t = implicitly[ClassTag[T]].runtimeClass
|
||||
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t))
|
||||
}
|
||||
if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c)
|
||||
})
|
||||
|
||||
override def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] = {
|
||||
getClassFor(fqcn).fold(Left(_), { c ⇒
|
||||
withErrorHandling {
|
||||
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] =
|
||||
Try {
|
||||
val types = args.map(_._1).toArray
|
||||
val values = args.map(_._2).toArray
|
||||
val constructor = clazz.getDeclaredConstructor(types: _*)
|
||||
constructor.setAccessible(true)
|
||||
val obj = constructor.newInstance(values: _*)
|
||||
val t = implicitly[ClassTag[T]].runtimeClass
|
||||
if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
|
||||
} recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException }
|
||||
|
||||
override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] =
|
||||
getClassFor(fqcn) flatMap { c ⇒ createInstanceFor(c, args) }
|
||||
|
||||
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
|
||||
getClassFor(fqcn) flatMap { c ⇒
|
||||
Try {
|
||||
val module = c.getDeclaredField("MODULE$")
|
||||
module.setAccessible(true)
|
||||
val t = implicitly[ClassTag[T]].runtimeClass
|
||||
module.get(null) match {
|
||||
case null ⇒ Left(new NullPointerException)
|
||||
case x if !t.isInstance(x) ⇒ Left(new ClassCastException(fqcn + " is not a subtype of " + t))
|
||||
case x ⇒ Right(x.asInstanceOf[T])
|
||||
case null ⇒ throw new NullPointerException
|
||||
case x if !t.isInstance(x) ⇒ throw new ClassCastException(fqcn + " is not a subtype of " + t)
|
||||
case x: T ⇒ x
|
||||
}
|
||||
}
|
||||
})
|
||||
} recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException }
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -98,9 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext
|
|||
def this(clazz: Class[T]) = this()(ClassTag(clazz))
|
||||
|
||||
override def lookup(): ExtensionId[T] = this
|
||||
def createExtension(system: ExtendedActorSystem): T =
|
||||
system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)) match {
|
||||
case Left(ex) ⇒ throw ex
|
||||
case Right(r) ⇒ r
|
||||
}
|
||||
def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get
|
||||
}
|
||||
|
|
|
|||
|
|
@ -170,13 +170,7 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
|
||||
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
|
||||
val ser = SerializationExtension(cell.system)
|
||||
ser.serialize(props.creator) match {
|
||||
case Left(t) ⇒ throw t
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match {
|
||||
case Left(t) ⇒ throw t
|
||||
case _ ⇒ //All good
|
||||
}
|
||||
}
|
||||
ser.deserialize(ser.serialize(props.creator).get, props.creator.getClass).get
|
||||
}
|
||||
/*
|
||||
* in case we are currently terminating, fail external attachChild requests
|
||||
|
|
|
|||
|
|
@ -25,13 +25,7 @@ object Envelope {
|
|||
if (msg eq null) throw new InvalidMessageException("Message is null")
|
||||
if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) {
|
||||
val ser = SerializationExtension(system)
|
||||
ser.serialize(msg) match { //Verify serializability
|
||||
case Left(t) ⇒ throw t
|
||||
case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass) match { //Verify deserializability
|
||||
case Left(t) ⇒ throw t
|
||||
case _ ⇒ //All good
|
||||
}
|
||||
}
|
||||
ser.deserialize(ser.serialize(msg).get, msg.getClass).get
|
||||
}
|
||||
new Envelope(message, sender)
|
||||
}
|
||||
|
|
@ -426,14 +420,13 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config)
|
||||
case fqcn ⇒
|
||||
val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
|
||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒
|
||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
|
||||
case exception ⇒
|
||||
throw new IllegalArgumentException(
|
||||
("Cannot instantiate MailboxType [%s], defined in [%s], " +
|
||||
"make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters")
|
||||
.format(fqcn, config.getString("id")), exception)
|
||||
}
|
||||
}).get
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -445,13 +438,12 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
val args = Seq(
|
||||
classOf[Config] -> config,
|
||||
classOf[DispatcherPrerequisites] -> prerequisites)
|
||||
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒ throw new IllegalArgumentException(
|
||||
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
|
||||
case exception ⇒ throw new IllegalArgumentException(
|
||||
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
|
||||
make sure it has an accessible constructor with a [%s,%s] signature""")
|
||||
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
|
||||
}
|
||||
}).get
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,15 +148,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
|||
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
|
||||
case fqn ⇒
|
||||
val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
|
||||
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args) match {
|
||||
case Right(configurator) ⇒ configurator
|
||||
case Left(exception) ⇒
|
||||
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
|
||||
case exception ⇒
|
||||
throw new IllegalArgumentException(
|
||||
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
|
||||
"make sure it has constructor with [com.typesafe.config.Config] and " +
|
||||
"[akka.dispatch.DispatcherPrerequisites] parameters")
|
||||
.format(fqn, cfg.getString("id")), exception)
|
||||
}
|
||||
}).get
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,17 +101,13 @@ trait LoggingBus extends ActorEventBus {
|
|||
loggerName ← defaultLoggers
|
||||
if loggerName != StandardOutLogger.getClass.getName
|
||||
} yield {
|
||||
try {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName).map({
|
||||
case actorClass ⇒ addLogger(system, actorClass, level, logName)
|
||||
}).recover({
|
||||
case e ⇒ throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}).get
|
||||
}
|
||||
guard.withGuard {
|
||||
loggers = myloggers
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
package akka.serialization
|
||||
|
||||
import akka.AkkaException
|
||||
import scala.util.DynamicVariable
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
|
||||
import akka.event.Logging
|
||||
|
|
@ -13,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import scala.util.control.NonFatal
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import java.io.NotSerializableException
|
||||
import util.{ Try, DynamicVariable }
|
||||
|
||||
object Serialization {
|
||||
|
||||
|
|
@ -56,9 +56,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
|
||||
* to either an Array of Bytes or an Exception if one was thrown.
|
||||
*/
|
||||
def serialize(o: AnyRef): Either[Throwable, Array[Byte]] =
|
||||
try Right(findSerializerFor(o).toBinary(o))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o))
|
||||
|
||||
/**
|
||||
* Deserializes the given array of bytes using the specified serializer id,
|
||||
|
|
@ -67,18 +65,14 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
def deserialize(bytes: Array[Byte],
|
||||
serializerId: Int,
|
||||
clazz: Option[Class[_]]): Either[Throwable, AnyRef] =
|
||||
try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
clazz: Option[Class[_]]): Try[AnyRef] = Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
|
||||
|
||||
/**
|
||||
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
|
||||
* You can specify an optional ClassLoader to load the object into.
|
||||
* Returns either the resulting object or an Exception if one was thrown.
|
||||
*/
|
||||
def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Throwable, AnyRef] =
|
||||
try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
|
||||
catch { case NonFatal(e) ⇒ Left(e) }
|
||||
def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
|
||||
|
||||
/**
|
||||
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
|
||||
|
|
@ -128,28 +122,24 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
* Tries to load the specified Serializer by the fully-qualified name; the actual
|
||||
* loading is performed by the system’s [[akka.actor.DynamicAccess]].
|
||||
*/
|
||||
def serializerOf(serializerFQN: String): Either[Throwable, Serializer] =
|
||||
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)).fold(_ ⇒
|
||||
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()), Right(_))
|
||||
def serializerOf(serializerFQN: String): Try[Serializer] =
|
||||
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith {
|
||||
case _ ⇒ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq())
|
||||
}
|
||||
|
||||
/**
|
||||
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
|
||||
* By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer
|
||||
*/
|
||||
private val serializers: Map[String, Serializer] =
|
||||
for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity)
|
||||
for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).get
|
||||
|
||||
/**
|
||||
* bindings is a Seq of tuple representing the mapping from Class to Serializer.
|
||||
* It is primarily ordered by the most specific classes first, and secondly in the configured order.
|
||||
*/
|
||||
private[akka] val bindings: Seq[ClassSerializer] = {
|
||||
val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield {
|
||||
val c = system.dynamicAccess.getClassFor[Any](k).fold(throw _, identity[Class[_]])
|
||||
(c, serializers(v))
|
||||
}
|
||||
sort(configuredBindings)
|
||||
}
|
||||
private[akka] val bindings: Seq[ClassSerializer] =
|
||||
sort(for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v)))
|
||||
|
||||
/**
|
||||
* Sort so that subtypes always precede their supertypes, but without
|
||||
|
|
|
|||
|
|
@ -46,9 +46,9 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
|||
val failureDetector = {
|
||||
import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn }
|
||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold(
|
||||
e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString),
|
||||
identity)
|
||||
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).recover({
|
||||
case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
|
||||
}).get
|
||||
}
|
||||
|
||||
new Cluster(system, failureDetector)
|
||||
|
|
|
|||
|
|
@ -372,8 +372,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
val f: Future[Result] =
|
||||
for {
|
||||
x ← ask(actorA, Request).mapTo[Int] // call pattern directly
|
||||
s ← actorB ask Request mapTo manifest[String] // call by implicit conversion
|
||||
d ← actorC ? Request mapTo manifest[Double] // call by symbolic name
|
||||
s ← (actorB ask Request).mapTo[String] // call by implicit conversion
|
||||
d ← (actorC ? Request).mapTo[Double] // call by symbolic name
|
||||
} yield Result(x, s, d)
|
||||
|
||||
f pipeTo actorD // .. or ..
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package docs.routing
|
|||
import RouterDocSpec.MyActor
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.routing.RoundRobinRouter
|
||||
import akka.actor.{ActorRef, Props, Actor}
|
||||
import akka.actor.{ ActorRef, Props, Actor }
|
||||
|
||||
object RouterDocSpec {
|
||||
class MyActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -49,12 +49,12 @@ object ZeromqDocSpec {
|
|||
val timestamp = System.currentTimeMillis
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity)
|
||||
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload)))
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity)
|
||||
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload)))
|
||||
}
|
||||
|
|
@ -71,18 +71,12 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
|
||||
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
|
||||
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Load]) match {
|
||||
case Right(Load(timestamp, loadAverage)) ⇒
|
||||
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), classOf[Load]).get
|
||||
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
|
||||
}
|
||||
}
|
||||
//#logger
|
||||
|
|
@ -97,13 +91,10 @@ object ZeromqDocSpec {
|
|||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒
|
||||
ser.deserialize(m.payload(1), classOf[Heap]) match {
|
||||
case Right(Heap(timestamp, used, max)) ⇒
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
|
||||
}
|
||||
}
|
||||
//#alerter
|
||||
|
|
|
|||
|
|
@ -20,16 +20,10 @@ private[akka] object MessageSerializer {
|
|||
* Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
|
||||
*/
|
||||
def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = {
|
||||
val clazz =
|
||||
if (messageProtocol.hasMessageManifest) {
|
||||
system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8)
|
||||
.fold(throw _, Some(_))
|
||||
} else None
|
||||
SerializationExtension(system)
|
||||
.deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(r) ⇒ r
|
||||
}
|
||||
SerializationExtension(system).deserialize(
|
||||
messageProtocol.getMessage.toByteArray,
|
||||
messageProtocol.getSerializerId,
|
||||
if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -72,10 +72,9 @@ class RemoteActorRefProvider(
|
|||
classOf[ExtendedActorSystem] -> system,
|
||||
classOf[RemoteActorRefProvider] -> this)
|
||||
|
||||
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match {
|
||||
case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
|
||||
case Right(remote) ⇒ remote
|
||||
}
|
||||
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({
|
||||
case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
|
||||
}).get
|
||||
}
|
||||
|
||||
_log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")")
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.remote.RemoteProtocol.{ DaemonMsgCreateProtocol, DeployProtocol, Pro
|
|||
import akka.routing.{ NoRouter, RouterConfig }
|
||||
import akka.actor.FromClassCreator
|
||||
import scala.reflect.ClassTag
|
||||
import util.{ Failure, Success }
|
||||
|
||||
/**
|
||||
* Serializes akka's internal DaemonMsgCreate using protobuf
|
||||
|
|
@ -88,14 +89,10 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
|
||||
def props = {
|
||||
val creator =
|
||||
if (proto.getProps.hasFromClassCreator) {
|
||||
system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator) match {
|
||||
case Right(clazz) ⇒ FromClassCreator(clazz)
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
} else {
|
||||
if (proto.getProps.hasFromClassCreator)
|
||||
FromClassCreator(system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator).get)
|
||||
else
|
||||
deserialize(proto.getProps.getCreator, classOf[() ⇒ Actor])
|
||||
}
|
||||
|
||||
val routerConfig =
|
||||
if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig])
|
||||
|
|
@ -115,26 +112,22 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
|
|||
supervisor = deserializeActorRef(system, proto.getSupervisor))
|
||||
}
|
||||
|
||||
protected def serialize(any: AnyRef): ByteString =
|
||||
serialization.serialize(any) match {
|
||||
case Right(bytes) ⇒ ByteString.copyFrom(bytes)
|
||||
case Left(e) ⇒ throw e
|
||||
}
|
||||
protected def serialize(any: AnyRef): ByteString = ByteString.copyFrom(serialization.serialize(any).get)
|
||||
|
||||
protected def deserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
|
||||
val bytes = data.toByteArray
|
||||
serialization.deserialize(bytes, clazz) match {
|
||||
case Right(x: T) ⇒ x
|
||||
case Right(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
|
||||
case Left(e) ⇒
|
||||
case Success(x: T) ⇒ x
|
||||
case Success(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
|
||||
case Failure(e) ⇒
|
||||
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
|
||||
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
|
||||
// com.typesafe.config.Config
|
||||
// akka.routing.RouterConfig
|
||||
// akka.actor.Scope
|
||||
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
|
||||
case Right(x: T) ⇒ x
|
||||
case _ ⇒ throw e // the first exception
|
||||
case Success(x: T) ⇒ x
|
||||
case _ ⇒ throw e // the first exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,15 +79,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
def verifySerialization(msg: DaemonMsgCreate): Unit = {
|
||||
val bytes = ser.serialize(msg) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
|
||||
case Left(exception) ⇒ fail(exception)
|
||||
case Right(m: DaemonMsgCreate) ⇒ assertDaemonMsgCreate(msg, m)
|
||||
case other ⇒ throw new MatchError(other)
|
||||
}
|
||||
assertDaemonMsgCreate(msg, ser.deserialize(ser.serialize(msg).get, classOf[DaemonMsgCreate]).get.asInstanceOf[DaemonMsgCreate])
|
||||
}
|
||||
|
||||
def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = {
|
||||
|
|
|
|||
|
|
@ -136,14 +136,13 @@ object TestActorRef {
|
|||
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
|
||||
|
||||
def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
|
||||
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()) match {
|
||||
case Right(value) ⇒ value
|
||||
case Left(exception) ⇒ throw ActorInitializationException(null,
|
||||
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({
|
||||
case exception ⇒ throw ActorInitializationException(null,
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf(Props[MyActor]' to 'actorOf(Props(new MyActor)'.", exception)
|
||||
}
|
||||
}).get
|
||||
}), name)
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue