Merge branch 'master' into wip-2473-npe-subscribe-patriknw

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/Cluster.scala
This commit is contained in:
Patrik Nordwall 2012-09-06 22:02:23 +02:00
commit 6dd0d736f7
27 changed files with 143 additions and 256 deletions

View file

@ -79,6 +79,8 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi
intercept[InvalidActorNameException](system.actorOf(Props.empty, "")).getMessage.contains("empty") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "$hallo")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "a%")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "%3")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "%1t")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "a?")).getMessage.contains("conform") must be(true)
intercept[InvalidActorNameException](system.actorOf(Props.empty, "üß")).getMessage.contains("conform") must be(true)
}

View file

@ -20,7 +20,7 @@ import scala.runtime.NonLocalReturnControl
import akka.pattern.ask
import java.lang.{ IllegalStateException, ArithmeticException }
import java.util.concurrent._
import scala.reflect.ClassTag
import scala.reflect.{ ClassTag, classTag }
import scala.util.{ Failure, Success, Try }
object FutureSpec {
@ -260,7 +260,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
} yield b + "-" + c
Await.result(future1, timeout.duration) must be("10-14")
assert(checkType(future1, manifest[String]))
assert(checkType(future1, classTag[String]))
intercept[ClassCastException] { Await.result(future2, timeout.duration) }
system.stop(actor)
}
@ -479,7 +479,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
}))
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo classTag[Int])
assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000)
system.stop(oddActor)
@ -939,9 +939,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f((future, message) {
future.value must be('defined)
future.value.get must be('failure)
future.value.get match {
case Failure(f) f.getMessage must be(message)
}
val Failure(f) = future.value.get
f.getMessage must be(message)
})
}
"throw exception with 'get'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) }

View file

@ -87,50 +87,23 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
}
"serialize Address" in {
val b = serialize(addr) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match {
case Left(exception) fail(exception)
case Right(add) assert(add === addr)
}
assert(deserialize(serialize(addr).get, classOf[Address]).get === addr)
}
"serialize Person" in {
val b = serialize(person) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match {
case Left(exception) fail(exception)
case Right(p) assert(p === person)
}
assert(deserialize(serialize(person).get, classOf[Person]).get === person)
}
"serialize record with default serializer" in {
val r = Record(100, person)
val b = serialize(r) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match {
case Left(exception) fail(exception)
case Right(p) assert(p === r)
}
assert(deserialize(serialize(r).get, classOf[Record]).get === r)
}
"not serialize ActorCell" in {
val a = system.actorOf(Props(new Actor {
def receive = {
case o: ObjectOutputStream
try {
o.writeObject(this)
} catch {
case _: NotSerializableException testActor ! "pass"
}
try o.writeObject(this) catch { case _: NotSerializableException testActor ! "pass" }
}
}))
a ! new ObjectOutputStream(new ByteArrayOutputStream())

View file

@ -19,7 +19,7 @@ object ActorPath {
* Since Actors form a tree, it is addressable using an URL, therefor an Actor Name has to conform to:
* http://www.ietf.org/rfc/rfc2396.txt
*/
val ElementRegex = """[-\w:@&=+,.!~*'_;][-\w:@&=+,.!~*'$_;]*""".r
val ElementRegex = """(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})*""".r
}
/**

View file

@ -413,7 +413,7 @@ class LocalActorRefProvider(
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
private def guardianSupervisorStrategyConfigurator =
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x x)
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get
/**
* Overridable supervision strategy to be used by the /user guardian.

View file

@ -19,6 +19,7 @@ import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, R
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.cell.ChildrenContainer
import scala.concurrent.util.FiniteDuration
import util.{ Failure, Success }
object ActorSystem {
@ -540,10 +541,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
classOf[Scheduler] -> scheduler,
classOf[DynamicAccess] -> dynamicAccess)
dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments) match {
case Left(e) throw e
case Right(p) p
}
dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get
}
def deadLetters: ActorRef = provider.deadLetters
@ -678,13 +676,12 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
private def loadExtensions() {
import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn
dynamicAccess.getObjectFor[AnyRef](fqcn).fold(_ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match {
case Right(p: ExtensionIdProvider) registerExtension(p.lookup());
case Right(p: ExtensionId[_]) registerExtension(p);
case Right(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
case Left(problem) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match {
case Success(p: ExtensionIdProvider) registerExtension(p.lookup())
case Success(p: ExtensionId[_]) registerExtension(p)
case Success(other) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
case Failure(problem) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
}
}
}

View file

@ -156,15 +156,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case fqn
val args = Seq(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match {
case Right(router) router
case Left(exception)
throw new IllegalArgumentException(
("Cannot instantiate router [%s], defined in [%s], " +
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
"[com.typesafe.config.Config] parameter")
.format(fqn, key), exception)
}
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({
case exception throw new IllegalArgumentException(
("Cannot instantiate router [%s], defined in [%s], " +
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
"[com.typesafe.config.Config] parameter")
.format(fqn, key), exception)
}).get
}
Some(Deploy(key, deployment, router, NoScopeGiven))

View file

@ -6,6 +6,7 @@ package akka.actor
import scala.util.control.NonFatal
import java.lang.reflect.InvocationTargetException
import scala.reflect.ClassTag
import scala.util.Try
/**
* The DynamicAccess implementation is the class which is used for
@ -16,7 +17,6 @@ import scala.reflect.ClassTag
* unless they are extending Akka in ways which go beyond simple Extensions.
*/
abstract class DynamicAccess {
/**
* Convenience method which given a `Class[_]` object and a constructor description
* will create a new instance of that class.
@ -25,23 +25,13 @@ abstract class DynamicAccess {
* val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name))
* }}}
*/
def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = {
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
withErrorHandling {
val constructor = clazz.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*).asInstanceOf[T]
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t))
}
}
def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T]
/**
* Obtain a `Class[_]` object loaded with the right class loader (i.e. the one
* returned by `classLoader`).
*/
def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]]
def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]]
/**
* Obtain an object conforming to the type T, which is expected to be
@ -50,35 +40,18 @@ abstract class DynamicAccess {
* `args` argument. The exact usage of args depends on which type is requested,
* see the relevant requesting code for details.
*/
def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T]
def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T]
/**
* Obtain the Scala object instance for the given fully-qualified class name, if there is one.
*/
def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T]
def getObjectFor[T: ClassTag](fqcn: String): Try[T]
/**
* This is the class loader to be used in those special cases where the
* other factory method are not applicable (e.g. when constructing a ClassLoaderBinaryInputStream).
*/
def classLoader: ClassLoader
/**
* Caught exception is returned as Left(exception).
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
* Other `Throwable`, such as `Error` is thrown.
*/
@inline
final def withErrorHandling[T](body: Either[Throwable, T]): Either[Throwable, T] =
try body catch {
case e: InvocationTargetException
e.getTargetException match {
case NonFatal(t) Left(t)
case t throw t
}
case NonFatal(e) Left(e)
}
}
/**
@ -89,42 +62,41 @@ abstract class DynamicAccess {
* by default.
*/
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
//FIXME switch to Scala Reflection for 2.10
override def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] =
try {
override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
Try[Class[_ <: T]]({
val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]]
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c))
} catch {
case NonFatal(e) Left(e)
}
override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] =
getClassFor(fqcn).fold(Left(_), { c
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
withErrorHandling {
val constructor = c.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*)
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t))
}
if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c)
})
override def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] = {
getClassFor(fqcn).fold(Left(_), { c
withErrorHandling {
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] =
Try {
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
val constructor = clazz.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*)
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
} recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException }
override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] =
getClassFor(fqcn) flatMap { c createInstanceFor(c, args) }
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
getClassFor(fqcn) flatMap { c
Try {
val module = c.getDeclaredField("MODULE$")
module.setAccessible(true)
val t = implicitly[ClassTag[T]].runtimeClass
module.get(null) match {
case null Left(new NullPointerException)
case x if !t.isInstance(x) Left(new ClassCastException(fqcn + " is not a subtype of " + t))
case x Right(x.asInstanceOf[T])
case null throw new NullPointerException
case x if !t.isInstance(x) throw new ClassCastException(fqcn + " is not a subtype of " + t)
case x: T x
}
}
})
} recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException }
}
}
}

View file

@ -98,9 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext
def this(clazz: Class[T]) = this()(ClassTag(clazz))
override def lookup(): ExtensionId[T] = this
def createExtension(system: ExtendedActorSystem): T =
system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)) match {
case Left(ex) throw ex
case Right(r) r
}
def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get
}

View file

@ -170,13 +170,7 @@ private[akka] trait Children { this: ActorCell ⇒
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(cell.system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}
}
ser.deserialize(ser.serialize(props.creator).get, props.creator.getClass).get
}
/*
* in case we are currently terminating, fail external attachChild requests

View file

@ -25,13 +25,7 @@ object Envelope {
if (msg eq null) throw new InvalidMessageException("Message is null")
if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.serialize(msg) match { //Verify serializability
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, msg.getClass) match { //Verify deserializability
case Left(t) throw t
case _ //All good
}
}
ser.deserialize(ser.serialize(msg).get, msg.getClass).get
}
new Envelope(message, sender)
}
@ -426,14 +420,13 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case "bounded" new BoundedMailbox(prerequisites.settings, config)
case fqcn
val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
case Right(instance) instance
case Left(exception)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception
throw new IllegalArgumentException(
("Cannot instantiate MailboxType [%s], defined in [%s], " +
"make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters")
.format(fqcn, config.getString("id")), exception)
}
}).get
}
}
@ -445,13 +438,12 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
val args = Seq(
classOf[Config] -> config,
classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args) match {
case Right(instance) instance
case Left(exception) throw new IllegalArgumentException(
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
case exception throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}
}).get
}
}
}

View file

@ -148,15 +148,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn
val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args) match {
case Right(configurator) configurator
case Left(exception)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
case exception
throw new IllegalArgumentException(
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
"make sure it has constructor with [com.typesafe.config.Config] and " +
"[akka.dispatch.DispatcherPrerequisites] parameters")
.format(fqn, cfg.getString("id")), exception)
}
}).get
}
}
}

View file

@ -101,17 +101,13 @@ trait LoggingBus extends ActorEventBus {
loggerName defaultLoggers
if loggerName != StandardOutLogger.getClass.getName
} yield {
try {
system.dynamicAccess.getClassFor[Actor](loggerName) match {
case Right(actorClass) addLogger(system, actorClass, level, logName)
case Left(exception) throw exception
}
} catch {
case e: Exception
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + loggerName +
"] due to [" + e.toString + "]", e)
}
system.dynamicAccess.getClassFor[Actor](loggerName).map({
case actorClass addLogger(system, actorClass, level, logName)
}).recover({
case e throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + loggerName +
"] due to [" + e.toString + "]", e)
}).get
}
guard.withGuard {
loggers = myloggers

View file

@ -5,7 +5,6 @@
package akka.serialization
import akka.AkkaException
import scala.util.DynamicVariable
import com.typesafe.config.Config
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
import akka.event.Logging
@ -13,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.util.control.NonFatal
import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException
import util.{ Try, DynamicVariable }
object Serialization {
@ -56,9 +56,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Either[Throwable, Array[Byte]] =
try Right(findSerializerFor(o).toBinary(o))
catch { case NonFatal(e) Left(e) }
def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o))
/**
* Deserializes the given array of bytes using the specified serializer id,
@ -67,18 +65,14 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
def deserialize(bytes: Array[Byte],
serializerId: Int,
clazz: Option[Class[_]]): Either[Throwable, AnyRef] =
try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
catch { case NonFatal(e) Left(e) }
clazz: Option[Class[_]]): Try[AnyRef] = Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz))
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Throwable, AnyRef] =
try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
catch { case NonFatal(e) Left(e) }
def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)))
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
@ -128,28 +122,24 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* Tries to load the specified Serializer by the fully-qualified name; the actual
* loading is performed by the systems [[akka.actor.DynamicAccess]].
*/
def serializerOf(serializerFQN: String): Either[Throwable, Serializer] =
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)).fold(_
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()), Right(_))
def serializerOf(serializerFQN: String): Try[Serializer] =
system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith {
case _ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq())
}
/**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer
*/
private val serializers: Map[String, Serializer] =
for ((k: String, v: String) settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity)
for ((k: String, v: String) settings.Serializers) yield k -> serializerOf(v).get
/**
* bindings is a Seq of tuple representing the mapping from Class to Serializer.
* It is primarily ordered by the most specific classes first, and secondly in the configured order.
*/
private[akka] val bindings: Seq[ClassSerializer] = {
val configuredBindings = for ((k: String, v: String) settings.SerializationBindings if v != "none") yield {
val c = system.dynamicAccess.getClassFor[Any](k).fold(throw _, identity[Class[_]])
(c, serializers(v))
}
sort(configuredBindings)
}
private[akka] val bindings: Seq[ClassSerializer] =
sort(for ((k: String, v: String) settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v)))
/**
* Sort so that subtypes always precede their supertypes, but without

View file

@ -79,9 +79,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val failureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).fold(
e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString),
identity)
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
case e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}).get
}
// ========================================================

View file

@ -81,7 +81,8 @@ a top level actor, that is supervised by the system (internal guardian actor).
The name parameter is optional, but you should preferably name your actors, since
that is used in log messages and for identifying actors. The name must not be empty
or start with ``$``. If the given name is already in use by another child to the
or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space).
If the given name is already in use by another child to the
same parent actor an `InvalidActorNameException` is thrown.
Actors are automatically started asynchronously when created.

View file

@ -181,6 +181,18 @@ v2.1::
}
}, ec);
API changes of DynamicAccess
============================
All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X].
DynamicAccess.withErrorHandling has been removed since scala.util.Try now fulfills that role.
API changes of Serialization
============================
All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X].
Empty Props
===========

View file

@ -75,7 +75,8 @@ a top level actor, that is supervised by the system (internal guardian actor).
The name parameter is optional, but you should preferably name your actors, since
that is used in log messages and for identifying actors. The name must not be empty
or start with ``$``. If the given name is already in use by another child to the
or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space).
If the given name is already in use by another child to the
same parent actor an `InvalidActorNameException` is thrown.
Actors are automatically started asynchronously when created.

View file

@ -372,8 +372,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val f: Future[Result] =
for {
x ask(actorA, Request).mapTo[Int] // call pattern directly
s actorB ask Request mapTo manifest[String] // call by implicit conversion
d actorC ? Request mapTo manifest[Double] // call by symbolic name
s (actorB ask Request).mapTo[String] // call by implicit conversion
d (actorC ? Request).mapTo[Double] // call by symbolic name
} yield Result(x, s, d)
f pipeTo actorD // .. or ..

View file

@ -6,7 +6,7 @@ package docs.routing
import RouterDocSpec.MyActor
import akka.testkit.AkkaSpec
import akka.routing.RoundRobinRouter
import akka.actor.{ActorRef, Props, Actor}
import akka.actor.{ ActorRef, Props, Actor }
object RouterDocSpec {
class MyActor extends Actor {

View file

@ -49,12 +49,12 @@ object ZeromqDocSpec {
val timestamp = System.currentTimeMillis
// use akka SerializationExtension to convert to bytes
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity)
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload)))
// use akka SerializationExtension to convert to bytes
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity)
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload)))
}
@ -71,18 +71,12 @@ object ZeromqDocSpec {
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
ser.deserialize(m.payload(1), classOf[Heap]) match {
case Right(Heap(timestamp, used, max))
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
case Left(e) throw e
}
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
case m: ZMQMessage if m.firstFrameAsString == "health.load"
ser.deserialize(m.payload(1), classOf[Load]) match {
case Right(Load(timestamp, loadAverage))
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
case Left(e) throw e
}
val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), classOf[Load]).get
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
}
}
//#logger
@ -97,13 +91,10 @@ object ZeromqDocSpec {
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
ser.deserialize(m.payload(1), classOf[Heap]) match {
case Right(Heap(timestamp, used, max))
if ((used.toDouble / max) > 0.9) count += 1
else count = 0
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
case Left(e) throw e
}
val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get
if ((used.toDouble / max) > 0.9) count += 1
else count = 0
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
}
}
//#alerter

View file

@ -20,16 +20,10 @@ private[akka] object MessageSerializer {
* Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
*/
def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = {
val clazz =
if (messageProtocol.hasMessageManifest) {
system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8)
.fold(throw _, Some(_))
} else None
SerializationExtension(system)
.deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match {
case Left(e) throw e
case Right(r) r
}
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get
}
/**

View file

@ -72,10 +72,9 @@ class RemoteActorRefProvider(
classOf[ExtendedActorSystem] -> system,
classOf[RemoteActorRefProvider] -> this)
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match {
case Left(problem) throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
case Right(remote) remote
}
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({
case problem throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
}).get
}
_log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")")

View file

@ -13,6 +13,7 @@ import akka.remote.RemoteProtocol.{ DaemonMsgCreateProtocol, DeployProtocol, Pro
import akka.routing.{ NoRouter, RouterConfig }
import akka.actor.FromClassCreator
import scala.reflect.ClassTag
import util.{ Failure, Success }
/**
* Serializes akka's internal DaemonMsgCreate using protobuf
@ -88,14 +89,10 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
def props = {
val creator =
if (proto.getProps.hasFromClassCreator) {
system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator) match {
case Right(clazz) FromClassCreator(clazz)
case Left(e) throw e
}
} else {
if (proto.getProps.hasFromClassCreator)
FromClassCreator(system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator).get)
else
deserialize(proto.getProps.getCreator, classOf[() Actor])
}
val routerConfig =
if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig])
@ -115,26 +112,22 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e
supervisor = deserializeActorRef(system, proto.getSupervisor))
}
protected def serialize(any: AnyRef): ByteString =
serialization.serialize(any) match {
case Right(bytes) ByteString.copyFrom(bytes)
case Left(e) throw e
}
protected def serialize(any: AnyRef): ByteString = ByteString.copyFrom(serialization.serialize(any).get)
protected def deserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
val bytes = data.toByteArray
serialization.deserialize(bytes, clazz) match {
case Right(x: T) x
case Right(other) throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
case Left(e)
case Success(x: T) x
case Success(other) throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
case Failure(e)
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
// com.typesafe.config.Config
// akka.routing.RouterConfig
// akka.actor.Scope
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
case Right(x: T) x
case _ throw e // the first exception
case Success(x: T) x
case _ throw e // the first exception
}
}
}

View file

@ -79,15 +79,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec {
}
def verifySerialization(msg: DaemonMsgCreate): Unit = {
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
case Left(exception) fail(exception)
case Right(m: DaemonMsgCreate) assertDaemonMsgCreate(msg, m)
case other throw new MatchError(other)
}
assertDaemonMsgCreate(msg, ser.deserialize(ser.serialize(msg).get, classOf[DaemonMsgCreate]).get.asInstanceOf[DaemonMsgCreate])
}
def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = {

View file

@ -136,14 +136,13 @@ object TestActorRef {
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()) match {
case Right(value) value
case Left(exception) throw ActorInitializationException(null,
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({
case exception throw ActorInitializationException(null,
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf(Props[MyActor]' to 'actorOf(Props(new MyActor)'.", exception)
}
}).get
}), name)
/**

View file

@ -31,7 +31,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
private val noBytes = Array[Byte]()
private val zmqContext = params collectFirst { case c: Context c } getOrElse DefaultContext
private val deserializer = deserializerFromParams
private var deserializer = params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
private val socketType = {
import SocketType.{ ZMQSocketType ST }
params.collectFirst { case t: ST t }.getOrElse(throw new IllegalArgumentException("A socket type is required"))
@ -39,7 +39,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
private val socket: Socket = zmqContext.socket(socketType)
private val poller: Poller = zmqContext.poller
private val log = Logging(context.system, this)
private val pendingSends = new ListBuffer[Seq[Frame]]
@ -93,6 +92,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
case MulticastHops(value) socket.setMulticastHops(value)
case SendBufferSize(value) socket.setSendBufferSize(value)
case ReceiveBufferSize(value) socket.setReceiveBufferSize(value)
case d: Deserializer deserializer = d
}
private def handleSocketOptionQuery(msg: SocketOptionQuery): Unit =
@ -135,9 +135,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ }
}
private def deserializerFromParams: Deserializer =
params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
private def setupSocket() = params foreach {
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta // ignore, handled differently
case m self ! m