diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index fccfc75d98..caeacfb3db 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -14,6 +14,7 @@ import akka.util.duration._ object SupervisorMiscSpec { val config = """ pinned-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } test-dispatcher { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 45e1954486..8c949f8776 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -435,6 +435,7 @@ object DispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + @@ -506,6 +507,7 @@ object BalancingDispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index 6c66784e5d..cf8dd5eab5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -12,6 +12,7 @@ import akka.pattern.ask object PinnedActorSpec { val config = """ pinned-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } """ diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index ad39057d1d..9a0eab8830 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -34,6 +34,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getMilliseconds("akka.scheduler.tickDuration") must equal(100) settings.SchedulerTickDuration must equal(100 millis) + + settings.Daemonicity must be(false) + settings.JvmExitOnFatalError must be(true) } { diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index df2170905e..f510f848f8 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -4,8 +4,7 @@ package akka.serialization -import akka.testkit.AkkaSpec -import com.typesafe.config.ConfigFactory +import akka.testkit.{ AkkaSpec, EventFilter } import akka.actor._ import java.io._ import akka.dispatch.Await @@ -17,21 +16,25 @@ import akka.pattern.ask object SerializeSpec { - val serializationConf = ConfigFactory.parseString(""" + val config = """ akka { actor { serializers { - java = "akka.serialization.JavaSerializer" test = "akka.serialization.TestSerializer" } serialization-bindings { - java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"] + "akka.serialization.SerializeSpec$Person" = java + "akka.serialization.SerializeSpec$Address" = java + "akka.serialization.TestSerializble" = test + "akka.serialization.SerializeSpec$PlainMessage" = test + "akka.serialization.SerializeSpec$A" = java + "akka.serialization.SerializeSpec$B" = test + "akka.serialization.SerializeSpec$D" = test } } } - """) + """ @BeanInfo case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") } @@ -54,10 +57,18 @@ object SerializeSpec { class ExtendedPlainMessage extends PlainMessage + class Both(s: String) extends SimpleMessage(s) with Serializable + + trait A + trait B + class C extends B with A + class D extends A + class E extends D + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { +class SerializeSpec extends AkkaSpec(SerializeSpec.config) { import SerializeSpec._ val ser = SerializationExtension(system) @@ -69,8 +80,8 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { "Serialization" must { "have correct bindings" in { - ser.bindings(addr.getClass.getName) must be("java") - ser.bindings(classOf[PlainMessage].getName) must be("test") + ser.bindings.collectFirst { case (c, s) if c == addr.getClass ⇒ s.getClass } must be(Some(classOf[JavaSerializer])) + ser.bindings.collectFirst { case (c, s) if c == classOf[PlainMessage] ⇒ s.getClass } must be(Some(classOf[TestSerializer])) } "serialize Address" in { @@ -144,58 +155,68 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { } } - "resove serializer by direct interface" in { - val msg = new SimpleMessage("foo") - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by direct interface" in { + ser.serializerFor(classOf[SimpleMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by interface implemented by super class" in { - val msg = new ExtendedSimpleMessage("foo", 17) - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by interface implemented by super class" in { + ser.serializerFor(classOf[ExtendedSimpleMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by indirect interface" in { - val msg = new AnotherMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by indirect interface" in { + ser.serializerFor(classOf[AnotherMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer by indirect interface implemented by super class" in { - val msg = new ExtendedAnotherMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer by indirect interface implemented by super class" in { + ser.serializerFor(classOf[ExtendedAnotherMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer for message with binding" in { - val msg = new PlainMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer for message with binding" in { + ser.serializerFor(classOf[PlainMessage]).getClass must be(classOf[TestSerializer]) } - "resove serializer for message extending class with with binding" in { - val msg = new ExtendedPlainMessage - ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + "resolve serializer for message extending class with with binding" in { + ser.serializerFor(classOf[ExtendedPlainMessage]).getClass must be(classOf[TestSerializer]) + } + + "give warning for message with several bindings" in { + EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { + ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer]) + } + } + + "resolve serializer in the order of the bindings" in { + ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) + ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer]) + EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { + ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer]) + } + } + + "resolve serializer in the order of most specific binding first" in { + ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) + ser.serializerFor(classOf[D]).getClass must be(classOf[TestSerializer]) + ser.serializerFor(classOf[E]).getClass must be(classOf[TestSerializer]) + } + + "throw java.io.NotSerializableException when no binding" in { + intercept[java.io.NotSerializableException] { + ser.serializerFor(classOf[Actor]) + } } } } object VerifySerializabilitySpec { - val conf = ConfigFactory.parseString(""" + val conf = """ akka { actor { serialize-messages = on - serialize-creators = on - - serializers { - java = "akka.serialization.JavaSerializer" - default = "akka.serialization.JavaSerializer" - } - - serialization-bindings { - java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - } } } - """) + """ class FooActor extends Actor { def receive = { @@ -210,6 +231,7 @@ object VerifySerializabilitySpec { } } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { import VerifySerializabilitySpec._ implicit val timeout = Timeout(5 seconds) diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index ab0f7ec6eb..4693a56536 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -16,6 +16,7 @@ object CallingThreadDispatcherModelSpec { val config = { """ boss { + executor = thread-pool-executor type = PinnedDispatcher } """ + diff --git a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala index f7d9789ec3..0c4bc295fb 100644 --- a/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/NonFatalSpec.scala @@ -19,18 +19,16 @@ class NonFatalSpec extends AkkaSpec with MustMatchers { } } - "not match StackOverflowError" in { + "match StackOverflowError" in { //not @tailrec def blowUp(n: Long): Long = { blowUp(n + 1) + 1 } - intercept[StackOverflowError] { - try { - blowUp(0) - } catch { - case NonFatal(e) ⇒ assert(false) - } + try { + blowUp(0) + } catch { + case NonFatal(e) ⇒ // as expected } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index cdab8e968e..ce9bf684e8 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -36,6 +36,9 @@ akka { # Toggles whether the threads created by this ActorSystem should be daemons or not daemonic = off + # JVM shutdown, System.exit(-1), in case of a fatal error, such as OutOfMemoryError + jvmExitOnFatalError = on + actor { provider = "akka.actor.LocalActorRefProvider" @@ -97,7 +100,7 @@ akka { paths = [] } - # Routers with dynamically resizable number of routees; this feature is enabled + # Routers with dynamically resizable number of routees; this feature is enabled # by including (parts of) this section in the deployment resizer { @@ -156,7 +159,8 @@ akka { # the same type), PinnedDispatcher, or a FQCN to a class inheriting # MessageDispatcherConfigurator with a constructor with # com.typesafe.config.Config parameter and akka.dispatch.DispatcherPrerequisites - # parameters + # parameters. + # PinnedDispatcher must be used toghether with executor=thread-pool-executor. type = "Dispatcher" # Which kind of ExecutorService to use for this dispatcher @@ -262,23 +266,20 @@ akka { event-stream = off } - # Entries for pluggable serializers and their bindings. If a binding for a specific - # class is not found, then the default serializer (Java serialization) is used. + # Entries for pluggable serializers and their bindings. serializers { - # java = "akka.serialization.JavaSerializer" - # proto = "akka.serialization.ProtobufSerializer" - - default = "akka.serialization.JavaSerializer" + java = "akka.serialization.JavaSerializer" } - # serialization-bindings { - # java = ["akka.serialization.SerializeSpec$Address", - # "akka.serialization.MyJavaSerializableActor", - # "akka.serialization.MyStatelessActorWithMessagesInMailbox", - # "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - # proto = ["com.google.protobuf.Message", - # "akka.actor.ProtobufProtocol$MyMessage"] - # } + # Class to Serializer binding. You only need to specify the name of an interface + # or abstract base class of the messages. In case of ambiguity it is using the + # most specific configured class, or giving a warning and choosing the “first” one. + # + # To disable one of the default serializers, assign its class to "none", like + # "java.io.Serializable" = none + serialization-bindings { + "java.io.Serializable" = java + } } # Used to set the behavior of the scheduler. diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 47f7465535..ed079e678b 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -23,6 +23,7 @@ object AkkaException { sb.append("\tat %s\n" format trace(i)) sb.toString } + } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f86f64fa99..2b56bef5d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -476,10 +476,7 @@ private[akka] class ActorCell( cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - // FIXME: actor can be null when creation fails with fatal error, why? - case msg if actor == null ⇒ - system.eventStream.publish(Warning(self.path.toString, this.getClass, "Ignoring message due to null actor [%s]" format msg)) - case msg ⇒ actor(msg) + case msg ⇒ actor(msg) } currentMessage = null // reset current message after successful invocation } catch { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index b20cdf5f1c..72f2505994 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -92,6 +92,7 @@ object ActorSystem { final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") final val Daemonicity = getBoolean("akka.daemonic") + final val JvmExitOnFatalError = getBoolean("akka.jvmExitOnFatalError") if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") @@ -348,6 +349,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten log.error(cause, "Uncaught error from thread [{}]", thread.getName) cause match { case NonFatal(_) | _: InterruptedException ⇒ + case _ if settings.JvmExitOnFatalError ⇒ System.exit(-1) case _ ⇒ shutdown() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8a5bcfa385..0cf6c4a77b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -14,9 +14,9 @@ import akka.event.EventStream import com.typesafe.config.Config import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension -import akka.jsr166y.ForkJoinPool import akka.util.NonFatal import akka.event.Logging.LogEventException +import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -424,7 +424,41 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr threadPoolConfig.createExecutorServiceFactory(name, threadFactory) } +object ForkJoinExecutorConfigurator { + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class AkkaForkJoinPool(parallelism: Int, + threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + unhandledExceptionHandler: Thread.UncaughtExceptionHandler) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + override def execute(r: Runnable): Unit = r match { + case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) + case other ⇒ super.execute(other) + } + } + + /** + * INTERNAL AKKA USAGE ONLY + */ + final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] { + final override def setRawResult(u: Unit): Unit = () + final override def getRawResult(): Unit = () + final override def exec(): Boolean = try { mailbox.run; true } catch { + case anything ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + } +} + class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + import ForkJoinExecutorConfigurator._ def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match { case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct @@ -433,7 +467,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) + def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing) } final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = new ForkJoinExecutorServiceFactory( diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index c781595aeb..97ff17c075 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -8,18 +8,18 @@ import akka.event.Logging.Error import scala.Option import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption } import scala.util.continuations._ -import java.util.concurrent.TimeUnit.NANOSECONDS import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger } import akka.dispatch.Await.CanAwait -import java.util.concurrent._ import akka.util.NonFatal import akka.event.Logging.LogEventException import akka.event.Logging.Debug +import java.util.concurrent.TimeUnit.NANOSECONDS +import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } object Await { @@ -53,7 +53,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out - * @returns The returned value as returned by Awaitable.ready + * @return The returned value as returned by Awaitable.ready */ def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) @@ -62,7 +62,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out - * @returns The returned value as returned by Awaitable.result + * @return The returned value as returned by Awaitable.result */ def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) } @@ -192,7 +192,7 @@ object Future { def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val futureResult = Promise[T]() - val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _ + val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult tryComplete _ futures.foreach(_ onComplete completeFirst) futureResult @@ -208,12 +208,12 @@ object Future { val ref = new AtomicInteger(futures.size) val search: Either[Throwable, T] ⇒ Unit = v ⇒ try { v match { - case Right(r) ⇒ if (predicate(r)) result success Some(r) + case Right(r) ⇒ if (predicate(r)) result tryComplete Right(Some(r)) case _ ⇒ } } finally { if (ref.decrementAndGet == 0) - result success None + result tryComplete Right(None) } futures.foreach(_ onComplete search) @@ -279,13 +279,13 @@ object Future { * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { - val future = Promise[A] + val p = Promise[A] dispatchTask({ () ⇒ - (reify(body) foreachFull (future success, future failure): Future[Any]) onFailure { - case e: Exception ⇒ future failure e + (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + case NonFatal(e) ⇒ p tryComplete Left(e) } }, true) - future + p.future } /** @@ -379,7 +379,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case Left(t) ⇒ p failure t case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } } - that onFailure { case f ⇒ p failure f } + that onFailure { case f ⇒ p tryComplete Left(f) } p.future } @@ -411,7 +411,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { * callbacks may be registered; there is no guarantee that they will be * executed in a particular order. */ - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type /** * When the future is completed with a valid result, apply the provided @@ -483,7 +483,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { val p = Promise[A]() onComplete { - case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case NonFatal(x) ⇒ Left(x) }) case otherwise ⇒ p complete otherwise } p.future @@ -699,9 +699,12 @@ trait Promise[T] extends Future[T] { /** * Completes this Promise with the specified result, if not already completed. + * @throws IllegalStateException if already completed, this is to aid in debugging of complete-races, + * use tryComplete to do a conditional complete. * @return this */ - final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this } + final def complete(value: Either[Throwable, T]): this.type = + if (tryComplete(value)) this else throw new IllegalStateException("Promise already completed: " + this + " tried to complete with " + value) /** * Completes this Promise with the specified result, if not already completed. @@ -721,7 +724,7 @@ trait Promise[T] extends Future[T] { * @return this. */ final def completeWith(other: Future[T]): this.type = { - other onComplete { complete(_) } + other onComplete { tryComplete(_) } this } @@ -840,7 +843,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } } - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Either[Throwable, T] = { val cur = getState @@ -858,9 +861,8 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac } } - private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case NonFatal(e) ⇒ executor.reportFailure(e) } - } + private final def notifyCompleted[U](func: Either[Throwable, T] ⇒ U, result: Either[Throwable, T]): Unit = + try func(result) catch { case NonFatal(e) ⇒ executor reportFailure e } } /** @@ -871,7 +873,7 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe val value = Some(resolve(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { val completedAs = value.get Future dispatchTask (() ⇒ func(completedAs)) this @@ -982,7 +984,7 @@ abstract class Recover[+T] extends japi.RecoverBridge[T] { * This method will be invoked once when/if the Future this recover callback is registered on * becomes completed with a failure. * - * @returns a successful value for the passed in failure + * @return a successful value for the passed in failure * @throws the passed in failure to propagate it. * * Java API @@ -1005,7 +1007,7 @@ abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] { * This method will be invoked once when/if a Future that this callback is registered on * becomes completed with a success. * - * @returns true if the successful value should be propagated to the new Future or not + * @return true if the successful value should be propagated to the new Future or not */ def filter(result: T): Boolean } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index e181d7d0e0..1c95843977 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -333,14 +333,14 @@ trait MailboxType { * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorContext): Mailbox = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType { - override def create(receiver: ActorContext): Mailbox = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingDeque[Envelope]() } @@ -351,7 +351,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut @@ -359,7 +359,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } @@ -370,7 +370,7 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorContext) = + final override def create(receiver: ActorContext): Mailbox = new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index e89adde8fb..fbcd5e9f9e 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -8,14 +8,21 @@ import akka.AkkaException import akka.util.ReflectiveAccess import scala.util.DynamicVariable import com.typesafe.config.Config -import akka.config.ConfigurationException import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } import java.util.concurrent.ConcurrentHashMap import akka.event.Logging +import scala.collection.mutable.ArrayBuffer +import java.io.NotSerializableException case class NoSerializerFoundException(m: String) extends AkkaException(m) object Serialization { + + /** + * Tuple that represents mapping from Class to Serializer + */ + type ClassSerializer = (Class[_], Serializer) + /** * This holds a reference to the current ActorSystem (the surrounding context) * during serialization and deserialization. @@ -40,28 +47,19 @@ object Serialization { import scala.collection.JavaConverters._ import config._ - val Serializers: Map[String, String] = - getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } + val Serializers: Map[String, String] = configToMap(getConfig("akka.actor.serializers")) - val SerializationBindings: Map[String, Seq[String]] = { - val configPath = "akka.actor.serialization-bindings" - hasPath(configPath) match { - case false ⇒ Map() - case true ⇒ - val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).root.unwrapped.asScala.toMap.map { - case (k: String, v: java.util.Collection[_]) ⇒ (k -> v.asScala.toSeq.asInstanceOf[Seq[String]]) - case invalid ⇒ throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid)) - } - serializationBindings + val SerializationBindings: Map[String, String] = configToMap(getConfig("akka.actor.serialization-bindings")) + + private def configToMap(cfg: Config): Map[String, String] = + cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } - } - } } } /** * Serialization module. Contains methods for serialization and deserialization as well as - * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. + * locating a Serializer for a particular class as defined in the mapping in the configuration. */ class Serialization(val system: ExtendedActorSystem) extends Extension { import Serialization._ @@ -105,8 +103,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } catch { case e: Exception ⇒ Left(e) } /** - * Returns the Serializer configured for the given object, returns the NullSerializer if it's null, - * falls back to the Serializer named "default" + * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. + * + * @throws akka.config.ConfigurationException if no `serialization-bindings` is configured for the + * class of the object */ def findSerializerFor(o: AnyRef): Serializer = o match { case null ⇒ NullSerializer @@ -114,82 +114,94 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * Returns the configured Serializer for the given Class, falls back to the Serializer named "default". - * It traverses interfaces and super classes to find any configured Serializer that match - * the class name. + * Returns the configured Serializer for the given Class. The configured Serializer + * is used if the configured class `isAssignableFrom` from the `clazz`, i.e. + * the configured class is a super class or implemented interface. In case of + * ambiguity it is primarily using the most specific configured class, + * and secondly the entry configured first. + * + * @throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class */ def serializerFor(clazz: Class[_]): Serializer = - if (bindings.isEmpty) { - // quick path to default when no bindings are registered - serializers("default") - } else { + serializerMap.get(clazz) match { + case null ⇒ + // bindings are ordered from most specific to least specific + def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean = + possibilities.size == 1 || + (possibilities map (_._1) forall (_ isAssignableFrom possibilities(0)._1)) || + (possibilities map (_._2) forall (_ == possibilities(0)._2)) - def resolve(c: Class[_]): Option[Serializer] = - serializerMap.get(c.getName) match { - case null ⇒ - val classes = c.getInterfaces ++ Option(c.getSuperclass) - classes.view map resolve collectFirst { case Some(x) ⇒ x } - case x ⇒ Some(x) + val ser = bindings filter { _._1 isAssignableFrom clazz } match { + case Seq() ⇒ + throw new NotSerializableException("No configured serialization-bindings for class [%s]" format clazz.getName) + case possibilities ⇒ + if (!unique(possibilities)) + log.warning("Multiple serializers found for " + clazz + ", choosing first: " + possibilities) + possibilities(0)._2 } - - serializerMap.get(clazz.getName) match { - case null ⇒ - val ser = resolve(clazz).getOrElse(serializers("default")) - // memorize the lookups for performance - serializerMap.putIfAbsent(clazz.getName, ser) match { - case null ⇒ - log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) - ser - case some ⇒ some - } - case ser ⇒ ser - } + serializerMap.putIfAbsent(clazz, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some + } + case ser ⇒ ser } /** - * Tries to load the specified Serializer by the FQN + * Tries to instantiate the specified Serializer by the FQN */ def serializerOf(serializerFQN: String): Either[Exception, Serializer] = - ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) + ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader) /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) - * By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer - * But "default" can be overridden in config + * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer */ - lazy val serializers: Map[String, Serializer] = { - val serializersConf = settings.Serializers - for ((k: String, v: String) ← serializersConf) + private val serializers: Map[String, Serializer] = { + for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity) } /** - * bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used + * bindings is a Seq of tuple representing the mapping from Class to Serializer. + * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ - lazy val bindings: Map[String, String] = { - settings.SerializationBindings.foldLeft(Map[String, String]()) { - //All keys which are lists, take the Strings from them and Map them - case (result, (k: String, vs: Seq[_])) ⇒ result ++ (vs collect { case v: String ⇒ (v, k) }) - //For any other values, just skip them - case (result, _) ⇒ result + private[akka] val bindings: Seq[ClassSerializer] = { + val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield { + val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, identity[Class[_]]) + (c, serializers(v)) } + sort(configuredBindings) } /** - * serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class + * Sort so that subtypes always precede their supertypes, but without + * obeying any order between unrelated subtypes (insert sort). */ - private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = { - val serializerMap = new ConcurrentHashMap[String, Serializer] - for ((k, v) ← bindings) { - serializerMap.put(k, serializers(v)) + private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] = + (new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca) ⇒ + buf.indexWhere(_._1 isAssignableFrom ca._1) match { + case -1 ⇒ buf append ca + case x ⇒ buf insert (x, ca) + } + buf } + + /** + * serializerMap is a Map whose keys is the class that is serializable and values is the serializer + * to be used for that class. + */ + private val serializerMap: ConcurrentHashMap[Class[_], Serializer] = { + val serializerMap = new ConcurrentHashMap[Class[_], Serializer] + for ((c, s) ← bindings) serializerMap.put(c, s) serializerMap } /** * Maps from a Serializer Identity (Int) to a Serializer instance (optimization) */ - lazy val serializerByIdentity: Map[Int, Serializer] = + val serializerByIdentity: Map[Int, Serializer] = Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) } } diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala index 36bb1960e9..ae7d91c7a3 100644 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -5,8 +5,9 @@ package akka.util /** * Extractor of non-fatal Throwables. Will not match fatal errors - * like VirtualMachineError (OutOfMemoryError, StackOverflowError) - * ThreadDeath, and InterruptedException. + * like VirtualMachineError (OutOfMemoryError) + * ThreadDeath, LinkageError and InterruptedException. + * StackOverflowError is matched, i.e. considered non-fatal. * * Usage to catch all harmless throwables: * {{{ @@ -20,8 +21,9 @@ package akka.util object NonFatal { def unapply(t: Throwable): Option[Throwable] = t match { - // VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors - case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException ⇒ None + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None case e ⇒ Some(e) } diff --git a/akka-agent/src/main/resources/reference.conf b/akka-agent/src/main/resources/reference.conf index 67da6e3821..7009c0f432 100644 --- a/akka-agent/src/main/resources/reference.conf +++ b/akka-agent/src/main/resources/reference.conf @@ -10,11 +10,13 @@ akka { # The dispatcher used for agent-send-off actor send-off-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } # The dispatcher used for agent-alter-off actor alter-off-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index 1b9026d082..bb15223842 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -369,7 +369,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Gossips to a random member in the set of members passed in as argument. * - * @returns 'true' if it gossiped to a "seed" member. + * @return 'true' if it gossiped to a "seed" member. */ private def gossipToRandomNodeOf(members: Set[Member]): Boolean = { val peers = members filter (_.address != address) // filter out myself diff --git a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java index b68f8f2e79..3db385ca1c 100644 --- a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java @@ -54,61 +54,7 @@ public class SerializationDocTestBase { } } //#my-own-serializer - @Test public void haveExamples() { - /* - //#serialize-messages-config - akka { - actor { - serialize-messages = on - } - } - //#serialize-messages-config - //#serialize-creators-config - akka { - actor { - serialize-creators = on - } - } - //#serialize-creators-config - - - //#serialize-serializers-config - akka { - actor { - serializers { - default = "akka.serialization.JavaSerializer" - - myown = "akka.docs.serialization.MyOwnSerializer" - } - } - } - //#serialize-serializers-config - - //#serialization-bindings-config - akka { - actor { - serializers { - default = "akka.serialization.JavaSerializer" - java = "akka.serialization.JavaSerializer" - proto = "akka.serialization.ProtobufSerializer" - myown = "akka.docs.serialization.MyOwnSerializer" - } - - serialization-bindings { - java = ["java.lang.String", - "app.my.Customer"] - proto = ["com.google.protobuf.Message"] - myown = ["my.own.BusinessObject", - "something.equally.Awesome", - "akka.docs.serialization.MyOwnSerializable" - "java.lang.Boolean"] - } - } - } - //#serialization-bindings-config - */ - } @Test public void demonstrateTheProgrammaticAPI() { //#programmatic diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index fd117f65f9..fceb94abbc 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -55,6 +55,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override the values for the ``default-dispatcher`` in your configuration. +There are two different executor services: + +* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for + ``default-dispatcher``. +* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + +Note that the pool size is configured differently for the two executor services. The configuration above +is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + Let's now walk through the different dispatchers in more detail. Thread-based @@ -67,9 +78,11 @@ has worse performance and scalability than the event-based dispatcher but works a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. -The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: +The ``PinnedDispatcher`` is configured like this: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config + +Note that it must be used with ``executor = "thread-pool-executor"``. Event-based ^^^^^^^^^^^ diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 196d7a40a5..9e0eb4eec9 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -26,7 +26,7 @@ to your ``application.conf`` file:: } remote { transport = "akka.remote.netty.NettyRemoteTransport" - server { + netty { hostname = "127.0.0.1" port = 2552 } diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index 2920538ded..e44d69f162 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -25,47 +25,39 @@ For Akka to know which ``Serializer`` to use for what, you need edit your :ref:` in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer`` you wish to use, like this: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-serializers-config - -.. note:: - - The name ``default`` is special in the sense that the ``Serializer`` - mapped to it will be used as default. +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config After you've bound names to different implementations of ``Serializer`` you need to wire which classes should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialization-bindings-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -.. note:: +You only need to specify the name of an interface or abstract base class of the +messages. In case of ambiguity, i.e. the message implements several of the +configured classes, the most specific configured class will be used, i.e. the +one of which all other candidates are superclasses. If this condition cannot be +met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply +and neither is a subtype of the other, a warning will be issued. - You only need to specify the name of an interface or abstract base class if the messages implements - that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. +Akka provides serializers for :class:`java.io.Serializable` and `protobuf +`_ +:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if +depending on the akka-remote module), so normally you don't need to add +configuration for that; since :class:`com.google.protobuf.GeneratedMessage` +implements :class:`java.io.Serializable`, protobuf messages will always by +serialized using the protobuf protocol unless specifically overridden. In order +to disable a default serializer, map its marker type to “none”:: -Protobuf --------- - -Akka provides a ``Serializer`` for `protobuf `_ messages. -To use that you need to add the following to the configuration:: - - akka { - actor { - serializers { - proto = "akka.serialization.ProtobufSerializer" - } - - serialization-bindings { - proto = ["com.google.protobuf.Message"] - } - } - } + akka.actor.serialization-bindings { + "java.io.Serializable" = none + } Verification ------------ If you want to verify that your messages are serializable you can enable the following config option: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-messages-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-messages-config .. warning:: @@ -74,7 +66,7 @@ If you want to verify that your messages are serializable you can enable the fol If you want to verify that your ``Props`` are serializable you can enable the following config option: -.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-creators-config +.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-creators-config .. warning:: @@ -110,4 +102,4 @@ which is done by extending ``akka.serialization.JSerializer``, like this: :exclude: ... Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then -list which classes that should be serialized using it. \ No newline at end of file +list which classes that should be serialized using it. diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 0df4e3ca5b..cd57fbeddc 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -20,6 +20,27 @@ object DispatcherDocSpec { val config = """ //#my-dispatcher-config my-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the number of messages that are processed in a batch before the + # thread is returned to the pool. Set to 1 for as fair as possible. + throughput = 100 + } + //#my-dispatcher-config + + //#my-thread-pool-dispatcher-config + my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use @@ -37,7 +58,14 @@ object DispatcherDocSpec { # thread is returned to the pool. Set to 1 for as fair as possible. throughput = 100 } - //#my-dispatcher-config + //#my-thread-pool-dispatcher-config + + //#my-pinned-dispatcher-config + my-pinned-dispatcher { + executor = "thread-pool-executor" + type = PinnedDispatcher + } + //#my-pinned-dispatcher-config //#my-bounded-config my-dispatcher-bounded-queue { diff --git a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala index 7f1553f75c..ce40adce3e 100644 --- a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala @@ -45,6 +45,9 @@ class MyOwnSerializer extends Serializer { } //#my-own-serializer +trait MyOwnSerializable +case class Customer(name: String) extends MyOwnSerializable + class SerializationDocSpec extends AkkaSpec { "demonstrate configuration of serialize messages" in { //#serialize-messages-config @@ -82,8 +85,8 @@ class SerializationDocSpec extends AkkaSpec { akka { actor { serializers { - default = "akka.serialization.JavaSerializer" - + java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } } @@ -91,8 +94,6 @@ class SerializationDocSpec extends AkkaSpec { """) //#serialize-serializers-config val a = ActorSystem("system", config) - SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") a.shutdown() } @@ -102,31 +103,26 @@ class SerializationDocSpec extends AkkaSpec { akka { actor { serializers { - default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { - java = ["java.lang.String", - "app.my.Customer"] - proto = ["com.google.protobuf.Message"] - myown = ["my.own.BusinessObject", - "something.equally.Awesome", - "akka.docs.serialization.MyOwnSerializable" - "java.lang.Boolean"] - } + "java.lang.String" = java + "akka.docs.serialization.Customer" = java + "com.google.protobuf.Message" = proto + "akka.docs.serialization.MyOwnSerializable" = myown + "java.lang.Boolean" = myown + } } } """) //#serialization-bindings-config val a = ActorSystem("system", config) - SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("java").getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") - SerializationExtension(a).serializerFor(classOf[String]).getClass.getName must equal("akka.serialization.JavaSerializer") - SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") + SerializationExtension(a).serializerFor(classOf[String]).getClass must equal(classOf[JavaSerializer]) + SerializationExtension(a).serializerFor(classOf[Customer]).getClass must equal(classOf[JavaSerializer]) + SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass must equal(classOf[MyOwnSerializer]) a.shutdown() } diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index c9923cf459..c6e6ae23e3 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -54,6 +54,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override the values for the ``default-dispatcher`` in your configuration. +There are two different executor services: + +* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for + ``default-dispatcher``. +* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``. + +Note that the pool size is configured differently for the two executor services. The configuration above +is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config + Let's now walk through the different dispatchers in more detail. Thread-based @@ -66,9 +77,11 @@ has worse performance and scalability than the event-based dispatcher but works a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with this dispatcher is that Actors do not block threads for each other. -The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: +The ``PinnedDispatcher`` is configured like this: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config + +Note that it must be used with ``executor = "thread-pool-executor"``. Event-based ^^^^^^^^^^^ diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index ae2aa0f411..f874e15a1b 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -27,7 +27,7 @@ to your ``application.conf`` file:: } remote { transport = "akka.remote.netty.NettyRemoteTransport" - server { + netty { hostname = "127.0.0.1" port = 2552 } diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 6a0867dea2..4735548140 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -27,38 +27,30 @@ you wish to use, like this: .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config -.. note:: - - The name ``default`` is special in the sense that the ``Serializer`` - mapped to it will be used as default. - After you've bound names to different implementations of ``Serializer`` you need to wire which classes should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config -.. note:: +You only need to specify the name of an interface or abstract base class of the +messages. In case of ambiguity, i.e. the message implements several of the +configured classes, the most specific configured class will be used, i.e. the +one of which all other candidates are superclasses. If this condition cannot be +met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply +and neither is a subtype of the other, a warning will be issued - You only need to specify the name of an interface or abstract base class if the messages implements - that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. +Akka provides serializers for :class:`java.io.Serializable` and `protobuf +`_ +:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if +depending on the akka-remote module), so normally you don't need to add +configuration for that; since :class:`com.google.protobuf.GeneratedMessage` +implements :class:`java.io.Serializable`, protobuf messages will always by +serialized using the protobuf protocol unless specifically overridden. In order +to disable a default serializer, map its marker type to “none”:: -Protobuf --------- - -Akka provides a ``Serializer`` for `protobuf `_ messages. -To use that you need to add the following to the configuration:: - - akka { - actor { - serializers { - proto = "akka.serialization.ProtobufSerializer" - } - - serialization-bindings { - proto = ["com.google.protobuf.Message"] - } - } - } + akka.actor.serialization-bindings { + "java.io.Serializable" = none + } Verification ------------ @@ -108,4 +100,4 @@ First you need to create a class definition of your ``Serializer`` like so: :exclude: ... Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then -list which classes that should be serialized using it. \ No newline at end of file +list which classes that should be serialized using it. diff --git a/akka-kernel/src/main/dist/config/application.conf b/akka-kernel/src/main/dist/config/application.conf index 2f7ad95abd..682d41e484 100644 --- a/akka-kernel/src/main/dist/config/application.conf +++ b/akka-kernel/src/main/dist/config/application.conf @@ -1,2 +1,3 @@ # In this file you can override any option defined in the 'reference.conf' files. # Copy in all or parts of the 'reference.conf' files and modify as you please. +# For more info about config, please visit the Akka Documentation: http://akka.io/docs/akka/2.0-SNAPSHOT/ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1158d12295..017b531faa 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -5,10 +5,24 @@ # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. +# comments above akka.actor settings left out where they are already in akka- +# actor.jar, because otherwise they would be repeated in config rendering. + akka { actor { + serializers { + proto = "akka.serialization.ProtobufSerializer" + } + + + serialization-bindings { + # Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage + # does, need to use the more specific one here in order to avoid ambiguity + "com.google.protobuf.GeneratedMessage" = proto + } + deployment { default { @@ -133,6 +147,7 @@ akka { # The dispatcher used for the system actor "network-event-sender" network-event-sender-dispatcher { + executor = thread-pool-executor type = PinnedDispatcher } } diff --git a/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala b/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala new file mode 100644 index 0000000000..474ef485d7 --- /dev/null +++ b/akka-remote/src/test/scala/akka/serialization/ProtobufSerializerSpec.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.serialization + +import akka.testkit.AkkaSpec +import akka.remote.RemoteProtocol.MessageProtocol +import akka.actor.ProtobufProtocol.MyMessage + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ProtobufSerializerSpec extends AkkaSpec { + + val ser = SerializationExtension(system) + + "Serialization" must { + + "resolve protobuf serializer" in { + ser.serializerFor(classOf[MessageProtocol]).getClass must be(classOf[ProtobufSerializer]) + ser.serializerFor(classOf[MyMessage]).getClass must be(classOf[ProtobufSerializer]) + } + + } +} + diff --git a/akka-zeromq/src/main/resources/reference.conf b/akka-zeromq/src/main/resources/reference.conf index 54922b8386..cfb5756156 100644 --- a/akka-zeromq/src/main/resources/reference.conf +++ b/akka-zeromq/src/main/resources/reference.conf @@ -15,6 +15,7 @@ akka { socket-dispatcher { # A zeromq socket needs to be pinned to the thread that created it. # Changing this value results in weird errors and race conditions within zeromq + executor = thread-pool-executor type = "PinnedDispatcher" } }