diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 82cea61625..94560083ab 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -260,7 +260,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } yield b + "-" + c Await.result(future1, timeout.duration) must be("10-14") - assert(checkType(future1, manifest[String])) + assert(checkType(future1, scala.reflect.classTag[String])) intercept[ClassCastException] { Await.result(future2, timeout.duration) } system.stop(actor) } @@ -479,7 +479,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } })) - val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) + val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo scala.reflect.classTag[Int]) assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000) system.stop(oddActor) @@ -939,9 +939,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f((future, message) ⇒ { future.value must be('defined) future.value.get must be('failure) - future.value.get match { - case Failure(f) ⇒ f.getMessage must be(message) - } + val Failure(f) = future.value.get + f.getMessage must be(message) }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 90cf83a3a9..01181d57aa 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -87,50 +87,23 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) { } "serialize Address" in { - val b = serialize(addr) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match { - case Left(exception) ⇒ fail(exception) - case Right(add) ⇒ assert(add === addr) - } + assert(deserialize(serialize(addr).get, classOf[Address]).get === addr) } "serialize Person" in { - - val b = serialize(person) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match { - case Left(exception) ⇒ fail(exception) - case Right(p) ⇒ assert(p === person) - } + assert(deserialize(serialize(person).get, classOf[Person]).get === person) } "serialize record with default serializer" in { - val r = Record(100, person) - val b = serialize(r) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match { - case Left(exception) ⇒ fail(exception) - case Right(p) ⇒ assert(p === r) - } + assert(deserialize(serialize(r).get, classOf[Record]).get === r) } "not serialize ActorCell" in { val a = system.actorOf(Props(new Actor { def receive = { case o: ObjectOutputStream ⇒ - try { - o.writeObject(this) - } catch { - case _: NotSerializableException ⇒ testActor ! "pass" - } + try o.writeObject(this) catch { case _: NotSerializableException ⇒ testActor ! "pass" } } })) a ! new ObjectOutputStream(new ByteArrayOutputStream()) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0d92b06dcb..3f96bd839c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -413,7 +413,7 @@ class LocalActorRefProvider( def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras private def guardianSupervisorStrategyConfigurator = - dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x) + dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get /** * Overridable supervision strategy to be used by the “/user” guardian. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 7c227f3757..bce966b99e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -19,6 +19,7 @@ import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, R import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.cell.ChildrenContainer import scala.concurrent.util.FiniteDuration +import util.{ Failure, Success } object ActorSystem { @@ -540,10 +541,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classOf[Scheduler] -> scheduler, classOf[DynamicAccess] -> dynamicAccess) - dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments) match { - case Left(e) ⇒ throw e - case Right(p) ⇒ p - } + dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get } def deadLetters: ActorRef = provider.deadLetters @@ -678,13 +676,12 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, private def loadExtensions() { import scala.collection.JavaConversions._ settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ - dynamicAccess.getObjectFor[AnyRef](fqcn).fold(_ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match { - case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); - case Right(p: ExtensionId[_]) ⇒ registerExtension(p); - case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) - case Left(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) + dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match { + case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) + case Success(p: ExtensionId[_]) ⇒ registerExtension(p) + case Success(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) + case Failure(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) } - } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index abe4c18a70..9ceebaacb3 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -156,15 +156,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) - dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match { - case Right(router) ⇒ router - case Left(exception) ⇒ - throw new IllegalArgumentException( - ("Cannot instantiate router [%s], defined in [%s], " + - "make sure it extends [akka.routing.RouterConfig] and has constructor with " + - "[com.typesafe.config.Config] parameter") - .format(fqn, key), exception) - } + dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( + ("Cannot instantiate router [%s], defined in [%s], " + + "make sure it extends [akka.routing.RouterConfig] and has constructor with " + + "[com.typesafe.config.Config] parameter") + .format(fqn, key), exception) + }).get } Some(Deploy(key, deployment, router, NoScopeGiven)) diff --git a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala index bc5bef5d61..c9d9336235 100644 --- a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala @@ -1,11 +1,12 @@ /** - * Copyright (C) 2009-2012 Typesafe Inc. + * CopySuccess (C) 2009-2012 Typesafe Inc. */ package akka.actor import scala.util.control.NonFatal import java.lang.reflect.InvocationTargetException import scala.reflect.ClassTag +import scala.util.Try /** * The DynamicAccess implementation is the class which is used for @@ -16,7 +17,6 @@ import scala.reflect.ClassTag * unless they are extending Akka in ways which go beyond simple Extensions. */ abstract class DynamicAccess { - /** * Convenience method which given a `Class[_]` object and a constructor description * will create a new instance of that class. @@ -25,23 +25,13 @@ abstract class DynamicAccess { * val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name)) * }}} */ - def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = { - val types = args.map(_._1).toArray - val values = args.map(_._2).toArray - withErrorHandling { - val constructor = clazz.getDeclaredConstructor(types: _*) - constructor.setAccessible(true) - val obj = constructor.newInstance(values: _*).asInstanceOf[T] - val t = implicitly[ClassTag[T]].runtimeClass - if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t)) - } - } + def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] /** - * Obtain a `Class[_]` object loaded with the right class loader (i.e. the one + * Obtain a `Class[_]` object loaded with the Success class loader (i.e. the one * returned by `classLoader`). */ - def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] + def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] /** * Obtain an object conforming to the type T, which is expected to be @@ -50,35 +40,18 @@ abstract class DynamicAccess { * `args` argument. The exact usage of args depends on which type is requested, * see the relevant requesting code for details. */ - def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] + def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] /** * Obtain the Scala “object” instance for the given fully-qualified class name, if there is one. */ - def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] + def getObjectFor[T: ClassTag](fqcn: String): Try[T] /** * This is the class loader to be used in those special cases where the * other factory method are not applicable (e.g. when constructing a ClassLoaderBinaryInputStream). */ def classLoader: ClassLoader - - /** - * Caught exception is returned as Left(exception). - * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. - * Other `Throwable`, such as `Error` is thrown. - */ - @inline - final def withErrorHandling[T](body: ⇒ Either[Throwable, T]): Either[Throwable, T] = - try body catch { - case e: InvocationTargetException ⇒ - e.getTargetException match { - case NonFatal(t) ⇒ Left(t) - case t ⇒ throw t - } - case NonFatal(e) ⇒ Left(e) - } - } /** @@ -89,42 +62,41 @@ abstract class DynamicAccess { * by default. */ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { - //FIXME switch to Scala Reflection for 2.10 - override def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] = - try { + + override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] = + Try[Class[_ <: T]]({ val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]] val t = implicitly[ClassTag[T]].runtimeClass - if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c)) - } catch { - case NonFatal(e) ⇒ Left(e) - } - - override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = - getClassFor(fqcn).fold(Left(_), { c ⇒ - val types = args.map(_._1).toArray - val values = args.map(_._2).toArray - withErrorHandling { - val constructor = c.getDeclaredConstructor(types: _*) - constructor.setAccessible(true) - val obj = constructor.newInstance(values: _*) - val t = implicitly[ClassTag[T]].runtimeClass - if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t)) - } + if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c) }) - override def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] = { - getClassFor(fqcn).fold(Left(_), { c ⇒ - withErrorHandling { + override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] = + Try { + val types = args.map(_._1).toArray + val values = args.map(_._2).toArray + val constructor = clazz.getDeclaredConstructor(types: _*) + constructor.setAccessible(true) + val obj = constructor.newInstance(values: _*) + val t = implicitly[ClassTag[T]].runtimeClass + if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t) + } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } + + override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] = + getClassFor(fqcn) flatMap { c ⇒ createInstanceFor(c, args) } + + override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = { + getClassFor(fqcn) flatMap { c ⇒ + Try { val module = c.getDeclaredField("MODULE$") module.setAccessible(true) val t = implicitly[ClassTag[T]].runtimeClass module.get(null) match { - case null ⇒ Left(new NullPointerException) - case x if !t.isInstance(x) ⇒ Left(new ClassCastException(fqcn + " is not a subtype of " + t)) - case x ⇒ Right(x.asInstanceOf[T]) + case null ⇒ throw new NullPointerException + case x if !t.isInstance(x) ⇒ throw new ClassCastException(fqcn + " is not a subtype of " + t) + case x: T ⇒ x } - } - }) + } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } + } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index da0e7e6769..6fab4ceb07 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -98,9 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext def this(clazz: Class[T]) = this()(ClassTag(clazz)) override def lookup(): ExtensionId[T] = this - def createExtension(system: ExtendedActorSystem): T = - system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)) match { - case Left(ex) ⇒ throw ex - case Right(r) ⇒ r - } + def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 6481e14d90..4e59695cff 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -170,13 +170,7 @@ private[akka] trait Children { this: ActorCell ⇒ private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(cell.system) - ser.serialize(props.creator) match { - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } + ser.deserialize(ser.serialize(props.creator).get, props.creator.getClass).get } /* * in case we are currently terminating, fail external attachChild requests diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index f251ac4588..340195d1a6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -25,13 +25,7 @@ object Envelope { if (msg eq null) throw new InvalidMessageException("Message is null") if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(system) - ser.serialize(msg) match { //Verify serializability - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass) match { //Verify deserializability - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } + ser.deserialize(ser.serialize(msg).get, msg.getClass).get } new Envelope(message, sender) } @@ -426,14 +420,13 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) case fqcn ⇒ val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) - prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match { - case Right(instance) ⇒ instance - case Left(exception) ⇒ + prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("Cannot instantiate MailboxType [%s], defined in [%s], " + "make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters") .format(fqcn, config.getString("id")), exception) - } + }).get } } @@ -445,13 +438,12 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit val args = Seq( classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites) - prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args) match { - case Right(instance) ⇒ instance - case Left(exception) ⇒ throw new IllegalArgumentException( + prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], make sure it has an accessible constructor with a [%s,%s] signature""") .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) - } + }).get } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 1e6dbc8546..125c400bb6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -148,15 +148,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) - prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args) match { - case Right(configurator) ⇒ configurator - case Left(exception) ⇒ + prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + "make sure it has constructor with [com.typesafe.config.Config] and " + "[akka.dispatch.DispatcherPrerequisites] parameters") .format(fqn, cfg.getString("id")), exception) - } + }).get } } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index cca9ee5588..2a15860b97 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -101,17 +101,13 @@ trait LoggingBus extends ActorEventBus { loggerName ← defaultLoggers if loggerName != StandardOutLogger.getClass.getName } yield { - try { - system.dynamicAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) - case Left(exception) ⇒ throw exception - } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) - } + system.dynamicAccess.getClassFor[Actor](loggerName).map({ + case actorClass ⇒ addLogger(system, actorClass, level, logName) + }).recover({ + case e ⇒ throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) + }).get } guard.withGuard { loggers = myloggers diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index ee5e87466b..b9d6298784 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -5,7 +5,6 @@ package akka.serialization import akka.AkkaException -import scala.util.DynamicVariable import com.typesafe.config.Config import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess } import akka.event.Logging @@ -13,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.util.control.NonFatal import scala.collection.mutable.ArrayBuffer import java.io.NotSerializableException +import util.{ Try, DynamicVariable } object Serialization { @@ -56,9 +56,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration * to either an Array of Bytes or an Exception if one was thrown. */ - def serialize(o: AnyRef): Either[Throwable, Array[Byte]] = - try Right(findSerializerFor(o).toBinary(o)) - catch { case NonFatal(e) ⇒ Left(e) } + def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o)) /** * Deserializes the given array of bytes using the specified serializer id, @@ -67,18 +65,14 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { */ def deserialize(bytes: Array[Byte], serializerId: Int, - clazz: Option[Class[_]]): Either[Throwable, AnyRef] = - try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) - catch { case NonFatal(e) ⇒ Left(e) } + clazz: Option[Class[_]]): Try[AnyRef] = Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * You can specify an optional ClassLoader to load the object into. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Throwable, AnyRef] = - try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz))) - catch { case NonFatal(e) ⇒ Left(e) } + def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = Try(serializerFor(clazz).fromBinary(bytes, Some(clazz))) /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. @@ -128,28 +122,24 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Tries to load the specified Serializer by the fully-qualified name; the actual * loading is performed by the system’s [[akka.actor.DynamicAccess]]. */ - def serializerOf(serializerFQN: String): Either[Throwable, Serializer] = - system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)).fold(_ ⇒ - system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()), Right(_)) + def serializerOf(serializerFQN: String): Try[Serializer] = + system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith { + case _ ⇒ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()) + } /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer */ private val serializers: Map[String, Serializer] = - for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity) + for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).get /** * bindings is a Seq of tuple representing the mapping from Class to Serializer. * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ - private[akka] val bindings: Seq[ClassSerializer] = { - val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield { - val c = system.dynamicAccess.getClassFor[Any](k).fold(throw _, identity[Class[_]]) - (c, serializers(v)) - } - sort(configuredBindings) - } + private[akka] val bindings: Seq[ClassSerializer] = + sort(for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))) /** * Sort so that subtypes always precede their supertypes, but without diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c76b637164..3b92f99741 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -46,9 +46,9 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { val failureDetector = { import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold( - e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), - identity) + fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).recover({ + case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + }).get } new Cluster(system, failureDetector) diff --git a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala index 690b0ad838..0ce5f87728 100644 --- a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala @@ -372,8 +372,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val f: Future[Result] = for { x ← ask(actorA, Request).mapTo[Int] // call pattern directly - s ← actorB ask Request mapTo manifest[String] // call by implicit conversion - d ← actorC ? Request mapTo manifest[Double] // call by symbolic name + s ← (actorB ask Request).mapTo[String] // call by implicit conversion + d ← (actorC ? Request).mapTo[Double] // call by symbolic name } yield Result(x, s, d) f pipeTo actorD // .. or .. diff --git a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala index 972da038ca..bea1bf16f4 100644 --- a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala @@ -6,7 +6,7 @@ package docs.routing import RouterDocSpec.MyActor import akka.testkit.AkkaSpec import akka.routing.RoundRobinRouter -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ ActorRef, Props, Actor } object RouterDocSpec { class MyActor extends Actor { diff --git a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala index 8d8865dd44..6e77d7a843 100644 --- a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -49,12 +49,12 @@ object ZeromqDocSpec { val timestamp = System.currentTimeMillis // use akka SerializationExtension to convert to bytes - val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity) + val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get // the first frame is the topic, second is the message pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload))) // use akka SerializationExtension to convert to bytes - val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity) + val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get // the first frame is the topic, second is the message pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload))) } @@ -71,18 +71,12 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - ser.deserialize(m.payload(1), classOf[Heap]) match { - case Right(Heap(timestamp, used, max)) ⇒ - log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp))) - case Left(e) ⇒ throw e - } + val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get + log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp))) case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒ - ser.deserialize(m.payload(1), classOf[Load]) match { - case Right(Load(timestamp, loadAverage)) ⇒ - log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp))) - case Left(e) ⇒ throw e - } + val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), classOf[Load]).get + log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp))) } } //#logger @@ -97,13 +91,10 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - ser.deserialize(m.payload(1), classOf[Heap]) match { - case Right(Heap(timestamp, used, max)) ⇒ - if ((used.toDouble / max) > 0.9) count += 1 - else count = 0 - if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max)) - case Left(e) ⇒ throw e - } + val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get + if ((used.toDouble / max) > 0.9) count += 1 + else count = 0 + if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max)) } } //#alerter diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b2413da3f9..b459a48c98 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -20,16 +20,10 @@ private[akka] object MessageSerializer { * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = { - val clazz = - if (messageProtocol.hasMessageManifest) { - system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8) - .fold(throw _, Some(_)) - } else None - SerializationExtension(system) - .deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match { - case Left(e) ⇒ throw e - case Right(r) ⇒ r - } + SerializationExtension(system).deserialize( + messageProtocol.getMessage.toByteArray, + messageProtocol.getSerializerId, + if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5377013a42..b118279ae1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -72,10 +72,9 @@ class RemoteActorRefProvider( classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) - system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match { - case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - case Right(remote) ⇒ remote - } + system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({ + case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + }).get } _log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") diff --git a/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala index 8f25021253..19aabd398f 100644 --- a/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala @@ -13,6 +13,7 @@ import akka.remote.RemoteProtocol.{ DaemonMsgCreateProtocol, DeployProtocol, Pro import akka.routing.{ NoRouter, RouterConfig } import akka.actor.FromClassCreator import scala.reflect.ClassTag +import util.{ Failure, Success } /** * Serializes akka's internal DaemonMsgCreate using protobuf @@ -88,14 +89,10 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e def props = { val creator = - if (proto.getProps.hasFromClassCreator) { - system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator) match { - case Right(clazz) ⇒ FromClassCreator(clazz) - case Left(e) ⇒ throw e - } - } else { + if (proto.getProps.hasFromClassCreator) + FromClassCreator(system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator).get) + else deserialize(proto.getProps.getCreator, classOf[() ⇒ Actor]) - } val routerConfig = if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig]) @@ -115,26 +112,22 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e supervisor = deserializeActorRef(system, proto.getSupervisor)) } - protected def serialize(any: AnyRef): ByteString = - serialization.serialize(any) match { - case Right(bytes) ⇒ ByteString.copyFrom(bytes) - case Left(e) ⇒ throw e - } + protected def serialize(any: AnyRef): ByteString = ByteString.copyFrom(serialization.serialize(any).get) protected def deserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = { val bytes = data.toByteArray serialization.deserialize(bytes, clazz) match { - case Right(x: T) ⇒ x - case Right(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other)) - case Left(e) ⇒ + case Success(x: T) ⇒ x + case Success(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other)) + case Failure(e) ⇒ // Fallback to the java serializer, because some interfaces don't implement java.io.Serializable, // but the impl instance does. This could be optimized by adding java serializers in reference.conf: // com.typesafe.config.Config // akka.routing.RouterConfig // akka.actor.Scope serialization.deserialize(bytes, classOf[java.io.Serializable]) match { - case Right(x: T) ⇒ x - case _ ⇒ throw e // the first exception + case Success(x: T) ⇒ x + case _ ⇒ throw e // the first exception } } } diff --git a/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala index 0c92ec7ee1..38a50ea886 100644 --- a/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala @@ -79,15 +79,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { } def verifySerialization(msg: DaemonMsgCreate): Unit = { - val bytes = ser.serialize(msg) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match { - case Left(exception) ⇒ fail(exception) - case Right(m: DaemonMsgCreate) ⇒ assertDaemonMsgCreate(msg, m) - case other ⇒ throw new MatchError(other) - } + assertDaemonMsgCreate(msg, ser.deserialize(ser.serialize(msg).get, classOf[DaemonMsgCreate]).get.asInstanceOf[DaemonMsgCreate]) } def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 33559cd56c..57aedba34f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -136,14 +136,13 @@ object TestActorRef { def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({ - system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()) match { - case Right(value) ⇒ value - case Left(exception) ⇒ throw ActorInitializationException(null, + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({ + case exception ⇒ throw ActorInitializationException(null, "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf(Props[MyActor]' to 'actorOf(Props(new MyActor)'.", exception) - } + }).get }), name) /**