From a607fbd517e79355ae1f6c6d40e1482a3fc0ac46 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 12:10:37 +0100 Subject: [PATCH 01/17] Adding type parameter to onComplete for better type inference. Making complete throw exception on race. Minor code improvements inside Future --- .../src/main/scala/akka/dispatch/Future.scala | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index c781595aeb..e3821e8334 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -14,12 +14,12 @@ import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger } import akka.dispatch.Await.CanAwait import java.util.concurrent._ import akka.util.NonFatal import akka.event.Logging.LogEventException import akka.event.Logging.Debug +import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } object Await { @@ -192,7 +192,7 @@ object Future { def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val futureResult = Promise[T]() - val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _ + val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult tryComplete _ futures.foreach(_ onComplete completeFirst) futureResult @@ -208,12 +208,12 @@ object Future { val ref = new AtomicInteger(futures.size) val search: Either[Throwable, T] ⇒ Unit = v ⇒ try { v match { - case Right(r) ⇒ if (predicate(r)) result success Some(r) + case Right(r) ⇒ if (predicate(r)) result tryComplete Right(Some(r)) case _ ⇒ } } finally { if (ref.decrementAndGet == 0) - result success None + result tryComplete Right(None) } futures.foreach(_ onComplete search) @@ -279,13 +279,13 @@ object Future { * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { - val future = Promise[A] + val p = Promise[A] dispatchTask({ () ⇒ - (reify(body) foreachFull (future success, future failure): Future[Any]) onFailure { - case e: Exception ⇒ future failure e + (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + case NonFatal(e) ⇒ p tryComplete Left(e) } }, true) - future + p.future } /** @@ -379,7 +379,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case Left(t) ⇒ p failure t case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } } - that onFailure { case f ⇒ p failure f } + that onFailure { case f ⇒ p tryComplete Left(f) } p.future } @@ -411,7 +411,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { * callbacks may be registered; there is no guarantee that they will be * executed in a particular order. */ - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type /** * When the future is completed with a valid result, apply the provided @@ -483,7 +483,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { val p = Promise[A]() onComplete { - case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case NonFatal(x) ⇒ Left(x) }) case otherwise ⇒ p complete otherwise } p.future @@ -699,9 +699,12 @@ trait Promise[T] extends Future[T] { /** * Completes this Promise with the specified result, if not already completed. + * @throws IllegalStateException if already completed, this is to aid in debugging of complete-races, + * use tryComplete to do a conditional complete. * @return this */ - final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this } + final def complete(value: Either[Throwable, T]): this.type = + if (tryComplete(value)) this else throw new IllegalStateException("Promise already completed: " + this + " tried to complete with " + value) /** * Completes this Promise with the specified result, if not already completed. @@ -721,7 +724,7 @@ trait Promise[T] extends Future[T] { * @return this. */ final def completeWith(other: Future[T]): this.type = { - other onComplete { complete(_) } + other onComplete { tryComplete(_) } this } @@ -840,7 +843,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } } - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Either[Throwable, T] = { val cur = getState @@ -858,9 +861,8 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } } - private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case NonFatal(e) ⇒ executor.reportFailure(e) } - } + private final def notifyCompleted[U](func: Either[Throwable, T] ⇒ U, result: Either[Throwable, T]): Unit = + try func(result) catch { case NonFatal(e) ⇒ executor reportFailure e } } /** @@ -871,7 +873,7 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe val value = Some(resolve(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { val completedAs = value.get Future dispatchTask (() ⇒ func(completedAs)) this From fac0d2e509f792321f27e62fcab5f8687078faaf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 12:48:03 +0100 Subject: [PATCH 02/17] Improving the imports of Future --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e3821e8334..ad27c8f8be 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -8,18 +8,18 @@ import akka.event.Logging.Error import scala.Option import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ -import java.util.concurrent.TimeUnit.NANOSECONDS import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Duration, BoxedType } import akka.dispatch.Await.CanAwait -import java.util.concurrent._ import akka.util.NonFatal import akka.event.Logging.LogEventException import akka.event.Logging.Debug -import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } +import java.util.concurrent.TimeUnit.NANOSECONDS +import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } object Await { From 09c44289d45c9b002f34f13e79cc214a4de43fce Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 15:19:05 +0100 Subject: [PATCH 03/17] Creating our own FJTask and escalate exceptions to UHE --- .../akka/dispatch/AbstractDispatcher.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8a5bcfa385..e60b4fd335 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -14,9 +14,9 @@ import akka.event.EventStream import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension -import akka.jsr166y.ForkJoinPool import akka.util.NonFatal import akka.event.Logging.LogEventException +import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -424,7 +424,30 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr threadPoolConfig.createExecutorServiceFactory(name, threadFactory) } +object ForkJoinExecutorConfigurator { + class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory) extends ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) { + override def execute(r: Runnable): Unit = r match { + case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) + case other ⇒ super.execute(other) + } + } + class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { + final override def setRawResult(u: Unit): Unit = () + final override def getRawResult(): Unit = () + final override def exec(): Boolean = try { mailbox.run; true } catch { + case anything ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + } +} + class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + import ForkJoinExecutorConfigurator._ def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match { case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct @@ -433,7 +456,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) + def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory) } final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = new ForkJoinExecutorServiceFactory( From 66c1e2d835bc1c93db397144e4d646931ed6bbd3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 15:42:28 +0100 Subject: [PATCH 04/17] Sprinkling some finals and adding return types --- .../akka/dispatch/AbstractDispatcher.scala | 17 ++++++++++++++--- .../src/main/scala/akka/dispatch/Mailbox.scala | 8 ++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e60b4fd335..0cf6c4a77b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -425,13 +425,24 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr } object ForkJoinExecutorConfigurator { - class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory) extends ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) { + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class AkkaForkJoinPool(parallelism: Int, + threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + unhandledExceptionHandler: Thread.UncaughtExceptionHandler) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { override def execute(r: Runnable): Unit = r match { case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) case other ⇒ super.execute(other) } } - class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { final override def setRawResult(u: Unit): Unit = () final override def getRawResult(): Unit = () final override def exec(): Boolean = try { mailbox.run; true } catch { @@ -456,7 +467,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory) + def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) } final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = new ForkJoinExecutorServiceFactory( diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 3aa6103b22..27853b49db 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -328,7 +328,7 @@ trait MailboxType { * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } @@ -339,7 +339,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut @@ -347,7 +347,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } @@ -358,7 +358,7 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut From 2acce5b17f688b403d6e358ea9eb03f657beb520 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 16:42:42 +0100 Subject: [PATCH 05/17] Switching to using the internalClassLoader for loading the Serializers --- .../src/main/scala/akka/serialization/Serialization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index e89adde8fb..05612b9cca 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -150,7 +150,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Tries to load the specified Serializer by the FQN */ def serializerOf(serializerFQN: String): Either[Exception, Serializer] = - ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) + ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader) /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) From 73fce5223545cf86ea9a0164921c3ed7a5af1e49 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 16:59:09 +0100 Subject: [PATCH 06/17] Fixing returns to return in ScalaDoc --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 8 ++++---- akka-cluster/src/main/scala/akka/cluster/Gossiper.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index ad27c8f8be..97ff17c075 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -53,7 +53,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out - * @returns The returned value as returned by Awaitable.ready + * @return The returned value as returned by Awaitable.ready */ def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) @@ -62,7 +62,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out - * @returns The returned value as returned by Awaitable.result + * @return The returned value as returned by Awaitable.result */ def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) } @@ -984,7 +984,7 @@ abstract class Recover[+T] extends japi.RecoverBridge[T] { * This method will be invoked once when/if the Future this recover callback is registered on * becomes completed with a failure. * - * @returns a successful value for the passed in failure + * @return a successful value for the passed in failure * @throws the passed in failure to propagate it. * * Java API @@ -1007,7 +1007,7 @@ abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] { * This method will be invoked once when/if a Future that this callback is registered on * becomes completed with a success. * - * @returns true if the successful value should be propagated to the new Future or not + * @return true if the successful value should be propagated to the new Future or not */ def filter(result: T): Boolean } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index 1b9026d082..bb15223842 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -369,7 +369,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Gossips to a random member in the set of members passed in as argument. * - * @returns 'true' if it gossiped to a "seed" member. + * @return 'true' if it gossiped to a "seed" member. */ private def gossipToRandomNodeOf(members: Set[Member]): Boolean = { val peers = members filter (_.address != address) // filter out myself From 4b56e754bc16db421bd90b35fcdc9e52ec135dfd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 17:06:08 +0100 Subject: [PATCH 07/17] Commenting out Akka Cluster because it shouldn't be Released, have fun Jonas ;) --- project/AkkaBuild.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index f810da86a6..1f763e582a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -31,7 +31,7 @@ object AkkaBuild extends Build { Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, /*cluster,*/ slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs) ) lazy val actor = Project( @@ -86,7 +86,7 @@ object AkkaBuild extends Build { ) ) configs (MultiJvm) - lazy val cluster = Project( + /*lazy val cluster = Project( id = "akka-cluster", base = file("akka-cluster"), dependencies = Seq(remote, remote % "test->test", testkit % "test->test"), @@ -103,7 +103,7 @@ object AkkaBuild extends Build { }, test in Test <<= (test in Test) dependsOn (test in MultiJvm) ) - ) configs (MultiJvm) + ) configs (MultiJvm)*/ lazy val slf4j = Project( id = "akka-slf4j", @@ -320,7 +320,7 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), + dependencies = Seq(actor, testkit % "test->test", remote, /*cluster,*/ slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), settings = defaultSettings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, @@ -429,7 +429,7 @@ object Dependencies { Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests ) - val cluster = Seq(Test.junit, Test.scalatest) + //val cluster = Seq(Test.junit, Test.scalatest) val slf4j = Seq(slf4jApi) From d7435547ff69ebdc4161c72ef1ecf68b839e36d8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 6 Feb 2012 17:29:17 +0100 Subject: [PATCH 08/17] Revert "Commenting out Akka Cluster because it shouldn't be Released, have fun Jonas ;)" This reverts commit 4b56e754bc16db421bd90b35fcdc9e52ec135dfd. --- project/AkkaBuild.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 1f763e582a..f810da86a6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -31,7 +31,7 @@ object AkkaBuild extends Build { Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, /*cluster,*/ slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs) ) lazy val actor = Project( @@ -86,7 +86,7 @@ object AkkaBuild extends Build { ) ) configs (MultiJvm) - /*lazy val cluster = Project( + lazy val cluster = Project( id = "akka-cluster", base = file("akka-cluster"), dependencies = Seq(remote, remote % "test->test", testkit % "test->test"), @@ -103,7 +103,7 @@ object AkkaBuild extends Build { }, test in Test <<= (test in Test) dependsOn (test in MultiJvm) ) - ) configs (MultiJvm)*/ + ) configs (MultiJvm) lazy val slf4j = Project( id = "akka-slf4j", @@ -320,7 +320,7 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", remote, /*cluster,*/ slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), + dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), settings = defaultSettings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, @@ -429,7 +429,7 @@ object Dependencies { Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests ) - //val cluster = Seq(Test.junit, Test.scalatest) + val cluster = Seq(Test.junit, Test.scalatest) val slf4j = Seq(slf4jApi) From 1dbce493593b67d52cac3baa06a8211433560204 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 6 Feb 2012 21:12:26 +0100 Subject: [PATCH 09/17] Configure serializer with class as key. See #1789 --- .../akka/serialization/SerializeSpec.scala | 96 +++++++------ akka-actor/src/main/resources/reference.conf | 24 ++-- .../akka/serialization/Serialization.scala | 132 +++++++++--------- .../SerializationDocTestBase.java | 54 ------- akka-docs/java/serialization.rst | 41 ++---- .../serialization/SerializationDocSpec.scala | 32 ++--- akka-docs/scala/serialization.rst | 33 +---- akka-remote/src/main/resources/reference.conf | 15 ++ .../ProtobufSerializerSpec.scala | 25 ++++ 9 files changed, 207 insertions(+), 245 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index df2170905e..8cf314e0d5 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -5,7 +5,6 @@ package akka.serialization import akka.testkit.AkkaSpec -import com.typesafe.config.ConfigFactory import akka.actor._ import java.io._ import akka.dispatch.Await @@ -17,21 +16,25 @@ import akka.pattern.ask object SerializeSpec { - val serializationConf = ConfigFactory.parseString(""" + val config = """ akka { actor { serializers { - java = "akka.serialization.JavaSerializer" test = "akka.serialization.TestSerializer" } serialization-bindings { - java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"] + "akka.serialization.SerializeSpec$Person" = java + "akka.serialization.SerializeSpec$Address" = java + "akka.serialization.TestSerializble" = test + "akka.serialization.SerializeSpec$PlainMessage" = test + "akka.serialization.SerializeSpec$A" = java + "akka.serialization.SerializeSpec$B" = test + "akka.serialization.SerializeSpec$D" = test } } } - """) + """ @BeanInfo case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") } @@ -54,10 +57,18 @@ object SerializeSpec { class ExtendedPlainMessage extends PlainMessage + class Both(s: String) extends SimpleMessage(s) with Serializable + + trait A + trait B + class C extends B with A + class D extends A + class E extends D + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { +class SerializeSpec extends AkkaSpec(SerializeSpec.config) { import SerializeSpec._ val ser = SerializationExtension(system) @@ -69,8 +80,8 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { "Serialization" must { "have correct bindings" in { - ser.bindings(addr.getClass.getName) must be("java") - ser.bindings(classOf[PlainMessage].getName) must be("test") + ser.bindings.find(_._1 == addr.getClass).map(_._2.getClass) must be(Some(classOf[JavaSerializer])) + ser.bindings.find(_._1 == classOf[PlainMessage]).map(_._2.getClass) must be(Some(classOf[TestSerializer])) } "serialize Address" in { @@ -144,58 +155,64 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { } } - "resove serializer by direct interface" in { - val msg = new SimpleMessage("foo") - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by direct interface" in { + ser.serializerFor(classOf[SimpleMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by interface implemented by super class" in { - val msg = new ExtendedSimpleMessage("foo", 17) - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by interface implemented by super class" in { + ser.serializerFor(classOf[ExtendedSimpleMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by indirect interface" in { - val msg = new AnotherMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by indirect interface" in { + ser.serializerFor(classOf[AnotherMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by indirect interface implemented by super class" in { - val msg = new ExtendedAnotherMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by indirect interface implemented by super class" in { + ser.serializerFor(classOf[ExtendedAnotherMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer for message with binding" in { - val msg = new PlainMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer for message with binding" in { + ser.serializerFor(classOf[PlainMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer for message extending class with with binding" in { - val msg = new ExtendedPlainMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer for message extending class with with binding" in { + ser.serializerFor(classOf[ExtendedPlainMessage]).getClass must be(classOf[TestSerializer]) + } + + "resolve serializer for message with several bindings" in { + ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer]) + } + + "resolve serializer in the order of the bindings" in { + ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) + ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer]) + ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer]) + } + + "resolve serializer in the order of most specific binding first" in { + ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) + ser.serializerFor(classOf[D]).getClass must be(classOf[TestSerializer]) + ser.serializerFor(classOf[E]).getClass must be(classOf[TestSerializer]) + } + + "throw java.io.NotSerializableException when no binding" in { + intercept[java.io.NotSerializableException] { + ser.serializerFor(classOf[Actor]) + } } } } object VerifySerializabilitySpec { - val conf = ConfigFactory.parseString(""" + val conf = """ akka { actor { serialize-messages = on - serialize-creators = on - - serializers { - java = "akka.serialization.JavaSerializer" - default = "akka.serialization.JavaSerializer" - } - - serialization-bindings { - java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - } } } - """) + """ class FooActor extends Actor { def receive = { @@ -210,6 +227,7 @@ object VerifySerializabilitySpec { } } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { import VerifySerializabilitySpec._ implicit val timeout = Timeout(5 seconds) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index cdab8e968e..ebeb7766ba 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -97,7 +97,7 @@ akka { paths = [] } - # Routers with dynamically resizable number of routees; this feature is enabled + # Routers with dynamically resizable number of routees; this feature is enabled # by including (parts of) this section in the deployment resizer { @@ -262,23 +262,19 @@ akka { event-stream = off } - # Entries for pluggable serializers and their bindings. If a binding for a specific - # class is not found, then the default serializer (Java serialization) is used. + # Entries for pluggable serializers and their bindings. serializers { - # java = "akka.serialization.JavaSerializer" + java = "akka.serialization.JavaSerializer" # proto = "akka.serialization.ProtobufSerializer" - - default = "akka.serialization.JavaSerializer" } - # serialization-bindings { - # java = ["akka.serialization.SerializeSpec$Address", - # "akka.serialization.MyJavaSerializableActor", - # "akka.serialization.MyStatelessActorWithMessagesInMailbox", - # "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - # proto = ["com.google.protobuf.Message", - # "akka.actor.ProtobufProtocol$MyMessage"] - # } + # Class to Serializer binding. You only need to specify the name of an interface + # or abstract base class of the messages. In case of ambiguity it is primarily + # using the most specific configured class, and secondly the entry configured first. + serialization-bindings { + "java.io.Serializable" = java + #"com.google.protobuf.Message" = proto + } } # Used to set the behavior of the scheduler. diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 05612b9cca..2d313b05a0 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -8,14 +8,21 @@ import akka.AkkaException import akka.util.ReflectiveAccess import scala.util.DynamicVariable import com.typesafe.config.Config -import akka.config.ConfigurationException import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } import java.util.concurrent.ConcurrentHashMap import akka.event.Logging +import scala.collection.mutable.ArrayBuffer +import java.io.NotSerializableException case class NoSerializerFoundException(m: String) extends AkkaException(m) object Serialization { + + /** + * Tuple that represents mapping from Class to Serializer + */ + type ClassSerializer = (Class[_], Serializer) + /** * This holds a reference to the current ActorSystem (the surrounding context) * during serialization and deserialization. @@ -40,28 +47,19 @@ object Serialization { import scala.collection.JavaConverters._ import config._ - val Serializers: Map[String, String] = - getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } + val Serializers: Map[String, String] = configToMap(getConfig("akka.actor.serializers")) - val SerializationBindings: Map[String, Seq[String]] = { - val configPath = "akka.actor.serialization-bindings" - hasPath(configPath) match { - case false ⇒ Map() - case true ⇒ - val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).root.unwrapped.asScala.toMap.map { - case (k: String, v: java.util.Collection[_]) ⇒ (k -> v.asScala.toSeq.asInstanceOf[Seq[String]]) - case invalid ⇒ throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid)) - } - serializationBindings + val SerializationBindings: Map[String, String] = configToMap(getConfig("akka.actor.serialization-bindings")) + + private def configToMap(cfg: Config): Map[String, String] = + cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } - } - } } } /** * Serialization module. Contains methods for serialization and deserialization as well as - * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. + * locating a Serializer for a particular class as defined in the mapping in the configuration. */ class Serialization(val system: ExtendedActorSystem) extends Extension { import Serialization._ @@ -105,8 +103,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } catch { case e: Exception ⇒ Left(e) } /** - * Returns the Serializer configured for the given object, returns the NullSerializer if it's null, - * falls back to the Serializer named "default" + * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. + * + * @throws akka.config.ConfigurationException if no `serialization-bindings` is configured for the + * class of the object */ def findSerializerFor(o: AnyRef): Serializer = o match { case null ⇒ NullSerializer @@ -114,82 +114,86 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * Returns the configured Serializer for the given Class, falls back to the Serializer named "default". - * It traverses interfaces and super classes to find any configured Serializer that match - * the class name. + * Returns the configured Serializer for the given Class. The configured Serializer + * is used if the configured class `isAssignableFrom` from the `clazz`, i.e. + * the configured class is a super class or implemented interface. In case of + * ambiguity it is primarily using the most specific configured class, + * and secondly the entry configured first. + * + * @throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class */ def serializerFor(clazz: Class[_]): Serializer = - if (bindings.isEmpty) { - // quick path to default when no bindings are registered - serializers("default") - } else { - - def resolve(c: Class[_]): Option[Serializer] = - serializerMap.get(c.getName) match { - case null ⇒ - val classes = c.getInterfaces ++ Option(c.getSuperclass) - classes.view map resolve collectFirst { case Some(x) ⇒ x } - case x ⇒ Some(x) + serializerMap.get(clazz) match { + case null ⇒ + val ser = bindings.find { case (c, s) ⇒ c.isAssignableFrom(clazz) } match { + case None ⇒ throw new NotSerializableException( + "No configured serialization-bindings for class [%s]" format clazz.getName) + case Some((c, s)) ⇒ s } - - serializerMap.get(clazz.getName) match { - case null ⇒ - val ser = resolve(clazz).getOrElse(serializers("default")) - // memorize the lookups for performance - serializerMap.putIfAbsent(clazz.getName, ser) match { - case null ⇒ - log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) - ser - case some ⇒ some - } - case ser ⇒ ser - } + // memorize for performance + serializerMap.putIfAbsent(clazz, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some + } + case ser ⇒ ser } /** - * Tries to load the specified Serializer by the FQN + * Tries to instantiate the specified Serializer by the FQN */ def serializerOf(serializerFQN: String): Either[Exception, Serializer] = ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader) /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) - * By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer - * But "default" can be overridden in config + * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer */ - lazy val serializers: Map[String, Serializer] = { - val serializersConf = settings.Serializers - for ((k: String, v: String) ← serializersConf) + private val serializers: Map[String, Serializer] = { + for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity) } /** - * bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used + * bindings is a Seq of tuple representing the mapping from Class to Serializer. + * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ - lazy val bindings: Map[String, String] = { - settings.SerializationBindings.foldLeft(Map[String, String]()) { - //All keys which are lists, take the Strings from them and Map them - case (result, (k: String, vs: Seq[_])) ⇒ result ++ (vs collect { case v: String ⇒ (v, k) }) - //For any other values, just skip them - case (result, _) ⇒ result + private[akka] val bindings: Seq[ClassSerializer] = { + val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings) yield { + val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, (c: Class[_]) ⇒ c) + (c, serializers(v)) } + sort(configuredBindings) } /** - * serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class + * Sort so that subtypes always precede their supertypes, but without + * obeying any order between unrelated subtypes (insert sort). */ - private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = { - val serializerMap = new ConcurrentHashMap[String, Serializer] - for ((k, v) ← bindings) { - serializerMap.put(k, serializers(v)) + private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] = + (new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca) ⇒ + buf.indexWhere(_._1 isAssignableFrom ca._1) match { + case -1 ⇒ buf append ca + case x ⇒ buf insert (x, ca) + } + buf } + + /** + * serializerMap is a Map whose keys is the class that is serializable and values is the serializer + * to be used for that class. + */ + private val serializerMap: ConcurrentHashMap[Class[_], Serializer] = { + val serializerMap = new ConcurrentHashMap[Class[_], Serializer] + for ((c, s) ← bindings) serializerMap.put(c, s) serializerMap } /** * Maps from a Serializer Identity (Int) to a Serializer instance (optimization) */ - lazy val serializerByIdentity: Map[Int, Serializer] = + val serializerByIdentity: Map[Int, Serializer] = Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) } } diff --git a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java index b68f8f2e79..3db385ca1c 100644 --- a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java @@ -54,61 +54,7 @@ public class SerializationDocTestBase { } } //#my-own-serializer - @Test public void haveExamples() { - /* - //#serialize-messages-config - akka { - actor { - serialize-messages = on - } - } - //#serialize-messages-config - //#serialize-creators-config - akka { - actor { - serialize-creators = on - } - } - //#serialize-creators-config - - - //#serialize-serializers-config - akka { - actor { - serializers { - default = "akka.serialization.JavaSerializer" - - myown = "akka.docs.serialization.MyOwnSerializer" - } - } - } - //#serialize-serializers-config - - //#serialization-bindings-config - akka { - actor { - serializers { - default = "akka.serialization.JavaSerializer" - java = "akka.serialization.JavaSerializer" - proto = "akka.serialization.ProtobufSerializer" - myown = "akka.docs.serialization.MyOwnSerializer" - } - - serialization-bindings { - java = ["java.lang.String", - "app.my.Customer"] - proto = ["com.google.protobuf.Message"] - myown = ["my.own.BusinessObject", - "something.equally.Awesome", - "akka.docs.serialization.MyOwnSerializable" - "java.lang.Boolean"] - } - } - } - //#serialization-bindings-config - */ - } @Test public void demonstrateTheProgrammaticAPI() { //#programmatic diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index 2920538ded..b0721e82cc 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -25,47 +25,28 @@ For Akka to know which ``Serializer`` to use for what, you need edit your :ref:` in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer`` you wish to use, like this: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-serializers-config - -.. note:: - - The name ``default`` is special in the sense that the ``Serializer`` - mapped to it will be used as default. +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config After you've bound names to different implementations of ``Serializer`` you need to wire which classes should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialization-bindings-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -.. note:: +You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, +i.e. the message implements several of the configured classes, it is primarily using the most specific +configured class, and secondly the entry configured first. - You only need to specify the name of an interface or abstract base class if the messages implements - that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. - -Protobuf --------- - -Akka provides a ``Serializer`` for `protobuf `_ messages. -To use that you need to add the following to the configuration:: - - akka { - actor { - serializers { - proto = "akka.serialization.ProtobufSerializer" - } - - serialization-bindings { - proto = ["com.google.protobuf.Message"] - } - } - } +Akka provides serializers for ``java.io.Serializable`` and `protobuf `_ +``com.google.protobuf.Message`` by default, so normally you don't need to add configuration for that, but +it can be done to force a specific serializer in case messages implements both ``java.io.Serializable`` +and ``com.google.protobuf.Message``. Verification ------------ If you want to verify that your messages are serializable you can enable the following config option: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-messages-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-messages-config .. warning:: @@ -74,7 +55,7 @@ If you want to verify that your messages are serializable you can enable the fol If you want to verify that your ``Props`` are serializable you can enable the following config option: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-creators-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-creators-config .. warning:: diff --git a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala index 7f1553f75c..ce40adce3e 100644 --- a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala @@ -45,6 +45,9 @@ class MyOwnSerializer extends Serializer { } //#my-own-serializer +trait MyOwnSerializable +case class Customer(name: String) extends MyOwnSerializable + class SerializationDocSpec extends AkkaSpec { "demonstrate configuration of serialize messages" in { //#serialize-messages-config @@ -82,8 +85,8 @@ class SerializationDocSpec extends AkkaSpec { akka { actor { serializers { - default = "akka.serialization.JavaSerializer" - + java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } } @@ -91,8 +94,6 @@ class SerializationDocSpec extends AkkaSpec { """) //#serialize-serializers-config val a = ActorSystem("system", config) - SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") a.shutdown() } @@ -102,31 +103,26 @@ class SerializationDocSpec extends AkkaSpec { akka { actor { serializers { - default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { - java = ["java.lang.String", - "app.my.Customer"] - proto = ["com.google.protobuf.Message"] - myown = ["my.own.BusinessObject", - "something.equally.Awesome", - "akka.docs.serialization.MyOwnSerializable" - "java.lang.Boolean"] - } + "java.lang.String" = java + "akka.docs.serialization.Customer" = java + "com.google.protobuf.Message" = proto + "akka.docs.serialization.MyOwnSerializable" = myown + "java.lang.Boolean" = myown + } } } """) //#serialization-bindings-config val a = ActorSystem("system", config) - SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("java").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") - SerializationExtension(a).serializerFor(classOf[String]).getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") + SerializationExtension(a).serializerFor(classOf[String]).getClass must equal(classOf[JavaSerializer]) + SerializationExtension(a).serializerFor(classOf[Customer]).getClass must equal(classOf[JavaSerializer]) + SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass must equal(classOf[MyOwnSerializer]) a.shutdown() } diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 6a0867dea2..9cfaf909b9 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -27,38 +27,19 @@ you wish to use, like this: .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config -.. note:: - - The name ``default`` is special in the sense that the ``Serializer`` - mapped to it will be used as default. - After you've bound names to different implementations of ``Serializer`` you need to wire which classes should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -.. note:: +You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, +i.e. the message implements several of the configured classes, it is primarily using the most specific +configured class, and secondly the entry configured first. - You only need to specify the name of an interface or abstract base class if the messages implements - that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. - -Protobuf --------- - -Akka provides a ``Serializer`` for `protobuf `_ messages. -To use that you need to add the following to the configuration:: - - akka { - actor { - serializers { - proto = "akka.serialization.ProtobufSerializer" - } - - serialization-bindings { - proto = ["com.google.protobuf.Message"] - } - } - } +Akka provides serializers for ``java.io.Serializable`` and `protobuf `_ +``com.google.protobuf.Message`` by default, so normally you don't need to add configuration for that, but +it can be done to force a specific serializer in case messages implements both ``java.io.Serializable`` +and ``com.google.protobuf.Message``. Verification ------------ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1158d12295..14269d305a 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -9,6 +9,21 @@ akka { actor { + # Entries for pluggable serializers and their bindings. + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" + } + + + # Class to Serializer binding. You only need to specify the name of an interface + # or abstract base class of the messages. In case of ambiguity it is primarily + # using the most specific configured class, and secondly the entry configured first. + serialization-bindings { + "com.google.protobuf.Message" = proto + "java.io.Serializable" = java + } + deployment { default { diff --git a/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala b/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala new file mode 100644 index 0000000000..474ef485d7 --- /dev/null +++ b/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.serialization + +import akka.testkit.AkkaSpec +import akka.remote.RemoteProtocol.MessageProtocol +import akka.actor.ProtobufProtocol.MyMessage + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ProtobufSerializerSpec extends AkkaSpec { + + val ser = SerializationExtension(system) + + "Serialization" must { + + "resolve protobuf serializer" in { + ser.serializerFor(classOf[MessageProtocol]).getClass must be(classOf[ProtobufSerializer]) + ser.serializerFor(classOf[MyMessage]).getClass must be(classOf[ProtobufSerializer]) + } + + } +} + From 239df9d5fbd47ab15b8f0ea48b2e7dad0a0c52c7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 6 Feb 2012 22:20:38 +0100 Subject: [PATCH 10/17] Improvements after review --- .../scala/akka/serialization/SerializeSpec.scala | 4 ++-- .../scala/akka/serialization/Serialization.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 8cf314e0d5..7b91e29bcb 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -80,8 +80,8 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) { "Serialization" must { "have correct bindings" in { - ser.bindings.find(_._1 == addr.getClass).map(_._2.getClass) must be(Some(classOf[JavaSerializer])) - ser.bindings.find(_._1 == classOf[PlainMessage]).map(_._2.getClass) must be(Some(classOf[TestSerializer])) + ser.bindings.collectFirst { case (c, s) if c == addr.getClass ⇒ s.getClass } must be(Some(classOf[JavaSerializer])) + ser.bindings.collectFirst { case (c, s) if c == classOf[PlainMessage] ⇒ s.getClass } must be(Some(classOf[TestSerializer])) } "serialize Address" in { diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 2d313b05a0..1a23833383 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -125,11 +125,11 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { def serializerFor(clazz: Class[_]): Serializer = serializerMap.get(clazz) match { case null ⇒ - val ser = bindings.find { case (c, s) ⇒ c.isAssignableFrom(clazz) } match { - case None ⇒ throw new NotSerializableException( - "No configured serialization-bindings for class [%s]" format clazz.getName) - case Some((c, s)) ⇒ s - } + val ser = bindings.collectFirst { + case (c, s) if c.isAssignableFrom(clazz) ⇒ s + } getOrElse (throw new NotSerializableException( + "No configured serialization-bindings for class [%s]" format clazz.getName)) + // memorize for performance serializerMap.putIfAbsent(clazz, ser) match { case null ⇒ @@ -161,7 +161,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { */ private[akka] val bindings: Seq[ClassSerializer] = { val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings) yield { - val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, (c: Class[_]) ⇒ c) + val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, identity[Class[_]]) (c, serializers(v)) } sort(configuredBindings) From 0bd4663c911d7ddf4a5a7d9c1636982e29d88218 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 7 Feb 2012 09:50:03 +0100 Subject: [PATCH 11/17] PinnedDispatcher config and docs for dispatcher executor. * Update PinnedDispatcher config in tests and docs. See #1796 * Update dispatchers doc with info about executor. See #1795 --- .../scala/akka/actor/SupervisorMiscSpec.scala | 1 + .../akka/actor/dispatch/ActorModelSpec.scala | 2 ++ .../akka/actor/dispatch/PinnedActorSpec.scala | 1 + .../CallingThreadDispatcherModelSpec.scala | 1 + akka-actor/src/main/resources/reference.conf | 3 +- .../src/main/scala/akka/AkkaException.scala | 1 + akka-agent/src/main/resources/reference.conf | 2 ++ akka-docs/java/dispatchers.rst | 17 +++++++++-- .../docs/dispatcher/DispatcherDocSpec.scala | 30 ++++++++++++++++++- akka-docs/scala/dispatchers.rst | 17 +++++++++-- akka-remote/src/main/resources/reference.conf | 1 + akka-zeromq/src/main/resources/reference.conf | 1 + 12 files changed, 71 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index fccfc75d98..caeacfb3db 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -14,6 +14,7 @@ import akka.util.duration._ object SupervisorMiscSpec { val config = """ pinned-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } test-dispatcher { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 45e1954486..8c949f8776 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -435,6 +435,7 @@ object DispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + @@ -506,6 +507,7 @@ object BalancingDispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index 6c66784e5d..cf8dd5eab5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -12,6 +12,7 @@ import akka.pattern.ask object PinnedActorSpec { val config = """ pinned-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } """ diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index ab0f7ec6eb..4693a56536 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -16,6 +16,7 @@ object CallingThreadDispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index cdab8e968e..94e78fbbb7 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -156,7 +156,8 @@ akka { # the same type), PinnedDispatcher, or a FQCN to a class inheriting # MessageDispatcherConfigurator with a constructor with # com.typesafe.config.Config parameter and akka.dispatch.DispatcherPrerequisites - # parameters + # parameters. + # PinnedDispatcher must be used toghether with executor=thread-pool-executor. type = "Dispatcher" # Which kind of ExecutorService to use for this dispatcher diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 47f7465535..ed079e678b 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -23,6 +23,7 @@ object AkkaException { sb.append("\tat %s\n" format trace(i)) sb.toString } + } /** diff --git a/akka-agent/src/main/resources/reference.conf b/akka-agent/src/main/resources/reference.conf index 67da6e3821..7009c0f432 100644 --- a/akka-agent/src/main/resources/reference.conf +++ b/akka-agent/src/main/resources/reference.conf @@ -10,11 +10,13 @@ akka { # The dispatcher used for agent-send-off actor send-off-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } # The dispatcher used for agent-alter-off actor alter-off-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index fd117f65f9..fceb94abbc 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -55,6 +55,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override the values for the ``default-dispatcher`` in your configuration. +There are two different executor services: + +* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for + ``default-dispatcher``. +* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + +Note that the pool size is configured differently for the two executor services. The configuration above +is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + Let's now walk through the different dispatchers in more detail. Thread-based @@ -67,9 +78,11 @@ has worse performance and scalability than the event-based dispatcher but works a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. -The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: +The ``PinnedDispatcher`` is configured like this: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config + +Note that it must be used with ``executor = "thread-pool-executor"``. Event-based ^^^^^^^^^^^ diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 0df4e3ca5b..cd57fbeddc 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -20,6 +20,27 @@ object DispatcherDocSpec { val config = """ //#my-dispatcher-config my-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the number of messages that are processed in a batch before the + # thread is returned to the pool. Set to 1 for as fair as possible. + throughput = 100 + } + //#my-dispatcher-config + + //#my-thread-pool-dispatcher-config + my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use @@ -37,7 +58,14 @@ object DispatcherDocSpec { # thread is returned to the pool. Set to 1 for as fair as possible. throughput = 100 } - //#my-dispatcher-config + //#my-thread-pool-dispatcher-config + + //#my-pinned-dispatcher-config + my-pinned-dispatcher { + executor = "thread-pool-executor" + type = PinnedDispatcher + } + //#my-pinned-dispatcher-config //#my-bounded-config my-dispatcher-bounded-queue { diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index c9923cf459..c6e6ae23e3 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -54,6 +54,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override the values for the ``default-dispatcher`` in your configuration. +There are two different executor services: + +* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for + ``default-dispatcher``. +* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + +Note that the pool size is configured differently for the two executor services. The configuration above +is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + Let's now walk through the different dispatchers in more detail. Thread-based @@ -66,9 +77,11 @@ has worse performance and scalability than the event-based dispatcher but works a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. -The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: +The ``PinnedDispatcher`` is configured like this: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config + +Note that it must be used with ``executor = "thread-pool-executor"``. Event-based ^^^^^^^^^^^ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1158d12295..4c5bca190f 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -133,6 +133,7 @@ akka { # The dispatcher used for the system actor "network-event-sender" network-event-sender-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } } diff --git a/akka-zeromq/src/main/resources/reference.conf b/akka-zeromq/src/main/resources/reference.conf index 54922b8386..cfb5756156 100644 --- a/akka-zeromq/src/main/resources/reference.conf +++ b/akka-zeromq/src/main/resources/reference.conf @@ -15,6 +15,7 @@ akka { socket-dispatcher { # A zeromq socket needs to be pinned to the thread that created it. # Changing this value results in weird errors and race conditions within zeromq + executor = thread-pool-executor type = "PinnedDispatcher" } } From e12804660fcc481da13adedc2911f168cc26a08c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 7 Feb 2012 11:02:01 +0100 Subject: [PATCH 12/17] Fixing doc error in remoting --- akka-docs/java/remoting.rst | 2 +- akka-docs/scala/remoting.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 196d7a40a5..9e0eb4eec9 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -26,7 +26,7 @@ to your ``application.conf`` file:: } remote { transport = "akka.remote.netty.NettyRemoteTransport" - server { + netty { hostname = "127.0.0.1" port = 2552 } diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index ae2aa0f411..f874e15a1b 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -27,7 +27,7 @@ to your ``application.conf`` file:: } remote { transport = "akka.remote.netty.NettyRemoteTransport" - server { + netty { hostname = "127.0.0.1" port = 2552 } From 3f067581428daea82491171a5ddd9c8374ceaa23 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 7 Feb 2012 11:14:09 +0100 Subject: [PATCH 13/17] Adding link to config for application.conf --- akka-kernel/src/main/dist/config/application.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-kernel/src/main/dist/config/application.conf b/akka-kernel/src/main/dist/config/application.conf index 2f7ad95abd..682d41e484 100644 --- a/akka-kernel/src/main/dist/config/application.conf +++ b/akka-kernel/src/main/dist/config/application.conf @@ -1,2 +1,3 @@ # In this file you can override any option defined in the 'reference.conf' files. # Copy in all or parts of the 'reference.conf' files and modify as you please. +# For more info about config, please visit the Akka Documentation: http://akka.io/docs/akka/2.0-SNAPSHOT/ From ac3868659305b2e1a891e7ac88ee66e2eb7ee9e8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 7 Feb 2012 12:00:50 +0100 Subject: [PATCH 14/17] System.exit on fatal error and removed actor null check. See #1799 and #1768 * Config property jvmExitOnFatalError * System.exit in case of fatal error, such as OutOfMemoryError * Adjusted NonFatal extractor, ok with StackOverflowError, not ok with LinkageError * Removed the actor null check in ActorCell --- .../src/test/scala/akka/config/ConfigSpec.scala | 3 +++ .../src/test/scala/akka/util/NonFatalSpec.scala | 12 +++++------- akka-actor/src/main/resources/reference.conf | 5 ++++- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 5 +---- .../src/main/scala/akka/actor/ActorSystem.scala | 2 ++ akka-actor/src/main/scala/akka/util/NonFatal.scala | 10 ++++++---- 6 files changed, 21 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index ad39057d1d..9a0eab8830 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -34,6 +34,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getMilliseconds("akka.scheduler.tickDuration") must equal(100) settings.SchedulerTickDuration must equal(100 millis) + + settings.Daemonicity must be(false) + settings.JvmExitOnFatalError must be(true) } { diff --git a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala index f7d9789ec3..0c4bc295fb 100644 --- a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala @@ -19,18 +19,16 @@ class NonFatalSpec extends AkkaSpec with MustMatchers { } } - "not match StackOverflowError" in { + "match StackOverflowError" in { //not @tailrec def blowUp(n: Long): Long = { blowUp(n + 1) + 1 } - intercept[StackOverflowError] { - try { - blowUp(0) - } catch { - case NonFatal(e) ⇒ assert(false) - } + try { + blowUp(0) + } catch { + case NonFatal(e) ⇒ // as expected } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index cdab8e968e..db81180fe7 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -36,6 +36,9 @@ akka { # Toggles whether the threads created by this ActorSystem should be daemons or not daemonic = off + # JVM shutdown, System.exit(-1), in case of a fatal error, such as OutOfMemoryError + jvmExitOnFatalError = on + actor { provider = "akka.actor.LocalActorRefProvider" @@ -97,7 +100,7 @@ akka { paths = [] } - # Routers with dynamically resizable number of routees; this feature is enabled + # Routers with dynamically resizable number of routees; this feature is enabled # by including (parts of) this section in the deployment resizer { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f86f64fa99..2b56bef5d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -476,10 +476,7 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - // FIXME: actor can be null when creation fails with fatal error, why? - case msg if actor == null ⇒ - system.eventStream.publish(Warning(self.path.toString, this.getClass, "Ignoring message due to null actor [%s]" format msg)) - case msg ⇒ actor(msg) + case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b20cdf5f1c..72f2505994 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -92,6 +92,7 @@ object ActorSystem { final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") final val Daemonicity = getBoolean("akka.daemonic") + final val JvmExitOnFatalError = getBoolean("akka.jvmExitOnFatalError") if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") @@ -348,6 +349,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten log.error(cause, "Uncaught error from thread [{}]", thread.getName) cause match { case NonFatal(_) | _: InterruptedException ⇒ + case _ if settings.JvmExitOnFatalError ⇒ System.exit(-1) case _ ⇒ shutdown() } } diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala index 36bb1960e9..ae7d91c7a3 100644 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -5,8 +5,9 @@ package akka.util /** * Extractor of non-fatal Throwables. Will not match fatal errors - * like VirtualMachineError (OutOfMemoryError, StackOverflowError) - * ThreadDeath, and InterruptedException. + * like VirtualMachineError (OutOfMemoryError) + * ThreadDeath, LinkageError and InterruptedException. + * StackOverflowError is matched, i.e. considered non-fatal. * * Usage to catch all harmless throwables: * {{{ @@ -20,8 +21,9 @@ package akka.util object NonFatal { def unapply(t: Throwable): Option[Throwable] = t match { - // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors - case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException ⇒ None + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None case e ⇒ Some(e) } From 8b9f1caf67b0c9bd7c75358b17c3584fb39e9ef5 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 7 Feb 2012 15:11:16 +0100 Subject: [PATCH 15/17] change serialization to strictly rely on subtyping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - when encountering new message type, check all bindings which map apply - if multiple are found, choose the most specific one if that exists or verify that all mappings yield the same serializer - in case of remaining ambiguity, throw exception - also add special handling for “none” serializer mapping: turn off a default --- .../akka/serialization/SerializeSpec.scala | 12 +++----- akka-actor/src/main/resources/reference.conf | 9 +++--- .../akka/serialization/Serialization.scala | 28 +++++++++++-------- akka-docs/java/serialization.rst | 28 +++++++++++++------ akka-docs/scala/serialization.rst | 28 +++++++++++++------ akka-remote/src/main/resources/reference.conf | 11 ++++---- 6 files changed, 72 insertions(+), 44 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 7b91e29bcb..9ab93ed974 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -179,14 +179,10 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) { ser.serializerFor(classOf[ExtendedPlainMessage]).getClass must be(classOf[TestSerializer]) } - "resolve serializer for message with several bindings" in { - ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer]) - } - - "resolve serializer in the order of the bindings" in { - ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) - ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer]) - ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer]) + "throw exception for message with several bindings" in { + intercept[java.io.NotSerializableException] { + ser.serializerFor(classOf[Both]) + } } "resolve serializer in the order of most specific binding first" in { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index fb0ad24a75..4e6c42c704 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -269,15 +269,16 @@ akka { # Entries for pluggable serializers and their bindings. serializers { java = "akka.serialization.JavaSerializer" - # proto = "akka.serialization.ProtobufSerializer" } # Class to Serializer binding. You only need to specify the name of an interface - # or abstract base class of the messages. In case of ambiguity it is primarily - # using the most specific configured class, and secondly the entry configured first. + # or abstract base class of the messages. In case of ambiguity it is + # using the most specific configured class, throwing an exception otherwise. + # + # To disable one of the default serializers, assign its class to "none", like + # "java.io.Serializable" = none serialization-bindings { "java.io.Serializable" = java - #"com.google.protobuf.Message" = proto } } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 1a23833383..3e86de6c1c 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -125,17 +125,23 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { def serializerFor(clazz: Class[_]): Serializer = serializerMap.get(clazz) match { case null ⇒ - val ser = bindings.collectFirst { - case (c, s) if c.isAssignableFrom(clazz) ⇒ s - } getOrElse (throw new NotSerializableException( - "No configured serialization-bindings for class [%s]" format clazz.getName)) + // bindings are ordered from most specific to least specific + def unique(cs: Seq[Class[_]], ser: Set[Serializer]): Boolean = (cs forall (_ isAssignableFrom cs(0))) || ser.size == 1 - // memorize for performance - serializerMap.putIfAbsent(clazz, ser) match { - case null ⇒ - log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) - ser - case some ⇒ some + val possible = bindings filter { _._1 isAssignableFrom clazz } + possible.size match { + case 0 ⇒ + throw new NotSerializableException("No configured serialization-bindings for class [%s]" format clazz.getName) + case x if x == 1 || unique(possible map (_._1), possible.map(_._2)(scala.collection.breakOut)) ⇒ + val ser = possible(0)._2 + serializerMap.putIfAbsent(clazz, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some + } + case _ ⇒ + throw new NotSerializableException("Multiple serializers found for " + clazz + ": " + possible) } case ser ⇒ ser } @@ -160,7 +166,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ private[akka] val bindings: Seq[ClassSerializer] = { - val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings) yield { + val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield { val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, identity[Class[_]]) (c, serializers(v)) } diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index b0721e82cc..dad2174f91 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -32,14 +32,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, -i.e. the message implements several of the configured classes, it is primarily using the most specific -configured class, and secondly the entry configured first. +You only need to specify the name of an interface or abstract base class of the +messages. In case of ambiguity, i.e. the message implements several of the +configured classes, the most specific configured class will be used, i.e. the +one of which all other candidates are superclasses. If this condition cannot be +met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply +and neither is a subtype of the other, an exception will be thrown during +serialization. -Akka provides serializers for ``java.io.Serializable`` and `protobuf `_ -``com.google.protobuf.Message`` by default, so normally you don't need to add configuration for that, but -it can be done to force a specific serializer in case messages implements both ``java.io.Serializable`` -and ``com.google.protobuf.Message``. +Akka provides serializers for :class:`java.io.Serializable` and `protobuf +`_ +:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if +depending on the akka-remote module), so normally you don't need to add +configuration for that; since :class:`com.google.protobuf.GeneratedMessage` +implements :class:`java.io.Serializable`, protobuf messages will always by +serialized using the protobuf protocol unless specifically overridden. In order +to disable a default serializer, map its marker type to “none”:: + + akka.actor.serialization-bindings { + "java.io.Serializable" = none + } Verification ------------ @@ -91,4 +103,4 @@ which is done by extending ``akka.serialization.JSerializer``, like this: :exclude: ... Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then -list which classes that should be serialized using it. \ No newline at end of file +list which classes that should be serialized using it. diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 9cfaf909b9..ec0f4ca706 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -32,14 +32,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, -i.e. the message implements several of the configured classes, it is primarily using the most specific -configured class, and secondly the entry configured first. +You only need to specify the name of an interface or abstract base class of the +messages. In case of ambiguity, i.e. the message implements several of the +configured classes, the most specific configured class will be used, i.e. the +one of which all other candidates are superclasses. If this condition cannot be +met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply +and neither is a subtype of the other, an exception will be thrown during +serialization. -Akka provides serializers for ``java.io.Serializable`` and `protobuf `_ -``com.google.protobuf.Message`` by default, so normally you don't need to add configuration for that, but -it can be done to force a specific serializer in case messages implements both ``java.io.Serializable`` -and ``com.google.protobuf.Message``. +Akka provides serializers for :class:`java.io.Serializable` and `protobuf +`_ +:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if +depending on the akka-remote module), so normally you don't need to add +configuration for that; since :class:`com.google.protobuf.GeneratedMessage` +implements :class:`java.io.Serializable`, protobuf messages will always by +serialized using the protobuf protocol unless specifically overridden. In order +to disable a default serializer, map its marker type to “none”:: + + akka.actor.serialization-bindings { + "java.io.Serializable" = none + } Verification ------------ @@ -89,4 +101,4 @@ First you need to create a class definition of your ``Serializer`` like so: :exclude: ... Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then -list which classes that should be serialized using it. \ No newline at end of file +list which classes that should be serialized using it. diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 38e0de27cc..a8854fc9df 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -11,17 +11,18 @@ akka { # Entries for pluggable serializers and their bindings. serializers { - java = "akka.serialization.JavaSerializer" proto = "akka.serialization.ProtobufSerializer" } # Class to Serializer binding. You only need to specify the name of an interface - # or abstract base class of the messages. In case of ambiguity it is primarily - # using the most specific configured class, and secondly the entry configured first. + # or abstract base class of the messages. In case of ambiguity it is + # using the most specific configured class, giving an error if two mappings are found + # which cannot be decided by sub-typing relation. serialization-bindings { - "com.google.protobuf.Message" = proto - "java.io.Serializable" = java + # Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage + # does, need to use the more specific one here in order to avoid ambiguity + "com.google.protobuf.GeneratedMessage" = proto } deployment { From 224ce7f773f0ca8aeb2a7391596036e4b33fad7e Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 7 Feb 2012 15:51:41 +0100 Subject: [PATCH 16/17] tone it down: just a Warning in case of ambiguous serializers --- .../akka/serialization/SerializeSpec.scala | 16 ++++++++++++---- akka-actor/src/main/resources/reference.conf | 4 ++-- .../akka/serialization/Serialization.scala | 19 ++++++++++--------- akka-docs/java/serialization.rst | 3 +-- akka-docs/scala/serialization.rst | 3 +-- akka-remote/src/main/resources/reference.conf | 8 +++----- 6 files changed, 29 insertions(+), 24 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 9ab93ed974..f510f848f8 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -4,7 +4,7 @@ package akka.serialization -import akka.testkit.AkkaSpec +import akka.testkit.{ AkkaSpec, EventFilter } import akka.actor._ import java.io._ import akka.dispatch.Await @@ -179,9 +179,17 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) { ser.serializerFor(classOf[ExtendedPlainMessage]).getClass must be(classOf[TestSerializer]) } - "throw exception for message with several bindings" in { - intercept[java.io.NotSerializableException] { - ser.serializerFor(classOf[Both]) + "give warning for message with several bindings" in { + EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { + ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer]) + } + } + + "resolve serializer in the order of the bindings" in { + ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) + ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer]) + EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { + ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer]) } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 4e6c42c704..ce9bf684e8 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -272,8 +272,8 @@ akka { } # Class to Serializer binding. You only need to specify the name of an interface - # or abstract base class of the messages. In case of ambiguity it is - # using the most specific configured class, throwing an exception otherwise. + # or abstract base class of the messages. In case of ambiguity it is using the + # most specific configured class, or giving a warning and choosing the “first” one. # # To disable one of the default serializers, assign its class to "none", like # "java.io.Serializable" = none diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 3e86de6c1c..b26adcfc36 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -129,19 +129,20 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { def unique(cs: Seq[Class[_]], ser: Set[Serializer]): Boolean = (cs forall (_ isAssignableFrom cs(0))) || ser.size == 1 val possible = bindings filter { _._1 isAssignableFrom clazz } - possible.size match { + val ser = possible.size match { case 0 ⇒ throw new NotSerializableException("No configured serialization-bindings for class [%s]" format clazz.getName) case x if x == 1 || unique(possible map (_._1), possible.map(_._2)(scala.collection.breakOut)) ⇒ - val ser = possible(0)._2 - serializerMap.putIfAbsent(clazz, ser) match { - case null ⇒ - log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) - ser - case some ⇒ some - } + possible(0)._2 case _ ⇒ - throw new NotSerializableException("Multiple serializers found for " + clazz + ": " + possible) + log.warning("Multiple serializers found for " + clazz + ", choosing first: " + possible) + possible(0)._2 + } + serializerMap.putIfAbsent(clazz, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some } case ser ⇒ ser } diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index dad2174f91..e44d69f162 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -37,8 +37,7 @@ messages. In case of ambiguity, i.e. the message implements several of the configured classes, the most specific configured class will be used, i.e. the one of which all other candidates are superclasses. If this condition cannot be met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply -and neither is a subtype of the other, an exception will be thrown during -serialization. +and neither is a subtype of the other, a warning will be issued. Akka provides serializers for :class:`java.io.Serializable` and `protobuf `_ diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index ec0f4ca706..4735548140 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -37,8 +37,7 @@ messages. In case of ambiguity, i.e. the message implements several of the configured classes, the most specific configured class will be used, i.e. the one of which all other candidates are superclasses. If this condition cannot be met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply -and neither is a subtype of the other, an exception will be thrown during -serialization. +and neither is a subtype of the other, a warning will be issued Akka provides serializers for :class:`java.io.Serializable` and `protobuf `_ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a8854fc9df..017b531faa 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -5,20 +5,18 @@ # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. +# comments above akka.actor settings left out where they are already in akka- +# actor.jar, because otherwise they would be repeated in config rendering. + akka { actor { - # Entries for pluggable serializers and their bindings. serializers { proto = "akka.serialization.ProtobufSerializer" } - # Class to Serializer binding. You only need to specify the name of an interface - # or abstract base class of the messages. In case of ambiguity it is - # using the most specific configured class, giving an error if two mappings are found - # which cannot be decided by sub-typing relation. serialization-bindings { # Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage # does, need to use the more specific one here in order to avoid ambiguity From c40d76c0ab4cc77615c2c1d5fdb6291368008dc2 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 7 Feb 2012 16:21:48 +0100 Subject: [PATCH 17/17] review-based improvements on Serialization --- .../akka/serialization/Serialization.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index b26adcfc36..fbcd5e9f9e 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -126,17 +126,18 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { serializerMap.get(clazz) match { case null ⇒ // bindings are ordered from most specific to least specific - def unique(cs: Seq[Class[_]], ser: Set[Serializer]): Boolean = (cs forall (_ isAssignableFrom cs(0))) || ser.size == 1 + def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean = + possibilities.size == 1 || + (possibilities map (_._1) forall (_ isAssignableFrom possibilities(0)._1)) || + (possibilities map (_._2) forall (_ == possibilities(0)._2)) - val possible = bindings filter { _._1 isAssignableFrom clazz } - val ser = possible.size match { - case 0 ⇒ + val ser = bindings filter { _._1 isAssignableFrom clazz } match { + case Seq() ⇒ throw new NotSerializableException("No configured serialization-bindings for class [%s]" format clazz.getName) - case x if x == 1 || unique(possible map (_._1), possible.map(_._2)(scala.collection.breakOut)) ⇒ - possible(0)._2 - case _ ⇒ - log.warning("Multiple serializers found for " + clazz + ", choosing first: " + possible) - possible(0)._2 + case possibilities ⇒ + if (!unique(possibilities)) + log.warning("Multiple serializers found for " + clazz + ", choosing first: " + possibilities) + possibilities(0)._2 } serializerMap.putIfAbsent(clazz, ser) match { case null ⇒