Fix merge conflict

This commit is contained in:
Philipp Haller 2012-02-07 17:04:38 +01:00
commit ab60681a17
31 changed files with 405 additions and 302 deletions

View file

@ -14,6 +14,7 @@ import akka.util.duration._
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """
pinned-dispatcher { pinned-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
test-dispatcher { test-dispatcher {

View file

@ -435,6 +435,7 @@ object DispatcherModelSpec {
val config = { val config = {
""" """
boss { boss {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
""" + """ +
@ -506,6 +507,7 @@ object BalancingDispatcherModelSpec {
val config = { val config = {
""" """
boss { boss {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
""" + """ +

View file

@ -12,6 +12,7 @@ import akka.pattern.ask
object PinnedActorSpec { object PinnedActorSpec {
val config = """ val config = """
pinned-dispatcher { pinned-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
""" """

View file

@ -34,6 +34,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getMilliseconds("akka.scheduler.tickDuration") must equal(100) getMilliseconds("akka.scheduler.tickDuration") must equal(100)
settings.SchedulerTickDuration must equal(100 millis) settings.SchedulerTickDuration must equal(100 millis)
settings.Daemonicity must be(false)
settings.JvmExitOnFatalError must be(true)
} }
{ {

View file

@ -4,8 +4,7 @@
package akka.serialization package akka.serialization
import akka.testkit.AkkaSpec import akka.testkit.{ AkkaSpec, EventFilter }
import com.typesafe.config.ConfigFactory
import akka.actor._ import akka.actor._
import java.io._ import java.io._
import akka.dispatch.Await import akka.dispatch.Await
@ -17,21 +16,25 @@ import akka.pattern.ask
object SerializeSpec { object SerializeSpec {
val serializationConf = ConfigFactory.parseString(""" val config = """
akka { akka {
actor { actor {
serializers { serializers {
java = "akka.serialization.JavaSerializer"
test = "akka.serialization.TestSerializer" test = "akka.serialization.TestSerializer"
} }
serialization-bindings { serialization-bindings {
java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] "akka.serialization.SerializeSpec$Person" = java
test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"] "akka.serialization.SerializeSpec$Address" = java
"akka.serialization.TestSerializble" = test
"akka.serialization.SerializeSpec$PlainMessage" = test
"akka.serialization.SerializeSpec$A" = java
"akka.serialization.SerializeSpec$B" = test
"akka.serialization.SerializeSpec$D" = test
} }
} }
} }
""") """
@BeanInfo @BeanInfo
case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") } case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") }
@ -54,10 +57,18 @@ object SerializeSpec {
class ExtendedPlainMessage extends PlainMessage class ExtendedPlainMessage extends PlainMessage
class Both(s: String) extends SimpleMessage(s) with Serializable
trait A
trait B
class C extends B with A
class D extends A
class E extends D
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
import SerializeSpec._ import SerializeSpec._
val ser = SerializationExtension(system) val ser = SerializationExtension(system)
@ -69,8 +80,8 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
"Serialization" must { "Serialization" must {
"have correct bindings" in { "have correct bindings" in {
ser.bindings(addr.getClass.getName) must be("java") ser.bindings.collectFirst { case (c, s) if c == addr.getClass s.getClass } must be(Some(classOf[JavaSerializer]))
ser.bindings(classOf[PlainMessage].getName) must be("test") ser.bindings.collectFirst { case (c, s) if c == classOf[PlainMessage] s.getClass } must be(Some(classOf[TestSerializer]))
} }
"serialize Address" in { "serialize Address" in {
@ -144,58 +155,68 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
} }
} }
"resove serializer by direct interface" in { "resolve serializer by direct interface" in {
val msg = new SimpleMessage("foo") ser.serializerFor(classOf[SimpleMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
} }
"resove serializer by interface implemented by super class" in { "resolve serializer by interface implemented by super class" in {
val msg = new ExtendedSimpleMessage("foo", 17) ser.serializerFor(classOf[ExtendedSimpleMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
} }
"resove serializer by indirect interface" in { "resolve serializer by indirect interface" in {
val msg = new AnotherMessage ser.serializerFor(classOf[AnotherMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
} }
"resove serializer by indirect interface implemented by super class" in { "resolve serializer by indirect interface implemented by super class" in {
val msg = new ExtendedAnotherMessage ser.serializerFor(classOf[ExtendedAnotherMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
} }
"resove serializer for message with binding" in { "resolve serializer for message with binding" in {
val msg = new PlainMessage ser.serializerFor(classOf[PlainMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
} }
"resove serializer for message extending class with with binding" in { "resolve serializer for message extending class with with binding" in {
val msg = new ExtendedPlainMessage ser.serializerFor(classOf[ExtendedPlainMessage]).getClass must be(classOf[TestSerializer])
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) }
"give warning for message with several bindings" in {
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer])
}
}
"resolve serializer in the order of the bindings" in {
ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer])
ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer])
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer])
}
}
"resolve serializer in the order of most specific binding first" in {
ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer])
ser.serializerFor(classOf[D]).getClass must be(classOf[TestSerializer])
ser.serializerFor(classOf[E]).getClass must be(classOf[TestSerializer])
}
"throw java.io.NotSerializableException when no binding" in {
intercept[java.io.NotSerializableException] {
ser.serializerFor(classOf[Actor])
}
} }
} }
} }
object VerifySerializabilitySpec { object VerifySerializabilitySpec {
val conf = ConfigFactory.parseString(""" val conf = """
akka { akka {
actor { actor {
serialize-messages = on serialize-messages = on
serialize-creators = on serialize-creators = on
serializers {
java = "akka.serialization.JavaSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
}
} }
} }
""") """
class FooActor extends Actor { class FooActor extends Actor {
def receive = { def receive = {
@ -210,6 +231,7 @@ object VerifySerializabilitySpec {
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) {
import VerifySerializabilitySpec._ import VerifySerializabilitySpec._
implicit val timeout = Timeout(5 seconds) implicit val timeout = Timeout(5 seconds)

View file

@ -16,6 +16,7 @@ object CallingThreadDispatcherModelSpec {
val config = { val config = {
""" """
boss { boss {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
""" + """ +

View file

@ -19,18 +19,16 @@ class NonFatalSpec extends AkkaSpec with MustMatchers {
} }
} }
"not match StackOverflowError" in { "match StackOverflowError" in {
//not @tailrec //not @tailrec
def blowUp(n: Long): Long = { def blowUp(n: Long): Long = {
blowUp(n + 1) + 1 blowUp(n + 1) + 1
} }
intercept[StackOverflowError] { try {
try { blowUp(0)
blowUp(0) } catch {
} catch { case NonFatal(e) // as expected
case NonFatal(e) assert(false)
}
} }
} }

View file

@ -36,6 +36,9 @@ akka {
# Toggles whether the threads created by this ActorSystem should be daemons or not # Toggles whether the threads created by this ActorSystem should be daemons or not
daemonic = off daemonic = off
# JVM shutdown, System.exit(-1), in case of a fatal error, such as OutOfMemoryError
jvmExitOnFatalError = on
actor { actor {
provider = "akka.actor.LocalActorRefProvider" provider = "akka.actor.LocalActorRefProvider"
@ -97,7 +100,7 @@ akka {
paths = [] paths = []
} }
# Routers with dynamically resizable number of routees; this feature is enabled # Routers with dynamically resizable number of routees; this feature is enabled
# by including (parts of) this section in the deployment # by including (parts of) this section in the deployment
resizer { resizer {
@ -156,7 +159,8 @@ akka {
# the same type), PinnedDispatcher, or a FQCN to a class inheriting # the same type), PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a constructor with # MessageDispatcherConfigurator with a constructor with
# com.typesafe.config.Config parameter and akka.dispatch.DispatcherPrerequisites # com.typesafe.config.Config parameter and akka.dispatch.DispatcherPrerequisites
# parameters # parameters.
# PinnedDispatcher must be used toghether with executor=thread-pool-executor.
type = "Dispatcher" type = "Dispatcher"
# Which kind of ExecutorService to use for this dispatcher # Which kind of ExecutorService to use for this dispatcher
@ -262,23 +266,20 @@ akka {
event-stream = off event-stream = off
} }
# Entries for pluggable serializers and their bindings. If a binding for a specific # Entries for pluggable serializers and their bindings.
# class is not found, then the default serializer (Java serialization) is used.
serializers { serializers {
# java = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer"
# proto = "akka.serialization.ProtobufSerializer"
default = "akka.serialization.JavaSerializer"
} }
# serialization-bindings { # Class to Serializer binding. You only need to specify the name of an interface
# java = ["akka.serialization.SerializeSpec$Address", # or abstract base class of the messages. In case of ambiguity it is using the
# "akka.serialization.MyJavaSerializableActor", # most specific configured class, or giving a warning and choosing the “first” one.
# "akka.serialization.MyStatelessActorWithMessagesInMailbox", #
# "akka.serialization.MyActorWithProtobufMessagesInMailbox"] # To disable one of the default serializers, assign its class to "none", like
# proto = ["com.google.protobuf.Message", # "java.io.Serializable" = none
# "akka.actor.ProtobufProtocol$MyMessage"] serialization-bindings {
# } "java.io.Serializable" = java
}
} }
# Used to set the behavior of the scheduler. # Used to set the behavior of the scheduler.

View file

@ -23,6 +23,7 @@ object AkkaException {
sb.append("\tat %s\n" format trace(i)) sb.append("\tat %s\n" format trace(i))
sb.toString sb.toString
} }
} }
/** /**

View file

@ -476,10 +476,7 @@ private[akka] class ActorCell(
cancelReceiveTimeout() // FIXME: leave this here??? cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match { messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle) case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
// FIXME: actor can be null when creation fails with fatal error, why? case msg actor(msg)
case msg if actor == null
system.eventStream.publish(Warning(self.path.toString, this.getClass, "Ignoring message due to null actor [%s]" format msg))
case msg actor(msg)
} }
currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch {

View file

@ -92,6 +92,7 @@ object ActorSystem {
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
final val Daemonicity = getBoolean("akka.daemonic") final val Daemonicity = getBoolean("akka.daemonic")
final val JvmExitOnFatalError = getBoolean("akka.jvmExitOnFatalError")
if (ConfigVersion != Version) if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
@ -348,6 +349,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
log.error(cause, "Uncaught error from thread [{}]", thread.getName) log.error(cause, "Uncaught error from thread [{}]", thread.getName)
cause match { cause match {
case NonFatal(_) | _: InterruptedException case NonFatal(_) | _: InterruptedException
case _ if settings.JvmExitOnFatalError System.exit(-1)
case _ shutdown() case _ shutdown()
} }
} }

View file

@ -14,9 +14,9 @@ import akka.event.EventStream
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.jsr166y.ForkJoinPool
import akka.util.NonFatal import akka.util.NonFatal
import akka.event.Logging.LogEventException import akka.event.Logging.LogEventException
import akka.jsr166y.{ ForkJoinTask, ForkJoinPool }
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) { if (message.isInstanceOf[AnyRef]) {
@ -424,7 +424,41 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
threadPoolConfig.createExecutorServiceFactory(name, threadFactory) threadPoolConfig.createExecutorServiceFactory(name, threadFactory)
} }
object ForkJoinExecutorConfigurator {
/**
* INTERNAL AKKA USAGE ONLY
*/
final class AkkaForkJoinPool(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) {
override def execute(r: Runnable): Unit = r match {
case m: Mailbox super.execute(new MailboxExecutionTask(m))
case other super.execute(other)
}
}
/**
* INTERNAL AKKA USAGE ONLY
*/
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { mailbox.run; true } catch {
case anything
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some some.uncaughtException(t, anything)
}
throw anything
}
}
}
class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match { def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = prerequisites.threadFactory match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct case correct: ForkJoinPool.ForkJoinWorkerThreadFactory correct
@ -433,7 +467,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int) extends ExecutorServiceFactory { val parallelism: Int) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = new ForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, true) def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
} }
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ForkJoinExecutorServiceFactory( new ForkJoinExecutorServiceFactory(

View file

@ -8,18 +8,18 @@ import akka.event.Logging.Error
import scala.Option import scala.Option
import akka.japi.{ Function JFunc, Option JOption } import akka.japi.{ Function JFunc, Option JOption }
import scala.util.continuations._ import scala.util.continuations._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import java.util.{ LinkedList JLinkedList } import java.util.{ LinkedList JLinkedList }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable.Stack import scala.collection.mutable.Stack
import akka.util.{ Duration, BoxedType } import akka.util.{ Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger }
import akka.dispatch.Await.CanAwait import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.util.NonFatal import akka.util.NonFatal
import akka.event.Logging.LogEventException import akka.event.Logging.LogEventException
import akka.event.Logging.Debug import akka.event.Logging.Debug
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
object Await { object Await {
@ -53,7 +53,7 @@ object Await {
* WARNING: Blocking operation, use with caution. * WARNING: Blocking operation, use with caution.
* *
* @throws [[java.util.concurrent.TimeoutException]] if times out * @throws [[java.util.concurrent.TimeoutException]] if times out
* @returns The returned value as returned by Awaitable.ready * @return The returned value as returned by Awaitable.ready
*/ */
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
@ -62,7 +62,7 @@ object Await {
* WARNING: Blocking operation, use with caution. * WARNING: Blocking operation, use with caution.
* *
* @throws [[java.util.concurrent.TimeoutException]] if times out * @throws [[java.util.concurrent.TimeoutException]] if times out
* @returns The returned value as returned by Awaitable.result * @return The returned value as returned by Awaitable.result
*/ */
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
} }
@ -192,7 +192,7 @@ object Future {
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val futureResult = Promise[T]() val futureResult = Promise[T]()
val completeFirst: Either[Throwable, T] Unit = futureResult complete _ val completeFirst: Either[Throwable, T] Unit = futureResult tryComplete _
futures.foreach(_ onComplete completeFirst) futures.foreach(_ onComplete completeFirst)
futureResult futureResult
@ -208,12 +208,12 @@ object Future {
val ref = new AtomicInteger(futures.size) val ref = new AtomicInteger(futures.size)
val search: Either[Throwable, T] Unit = v try { val search: Either[Throwable, T] Unit = v try {
v match { v match {
case Right(r) if (predicate(r)) result success Some(r) case Right(r) if (predicate(r)) result tryComplete Right(Some(r))
case _ case _
} }
} finally { } finally {
if (ref.decrementAndGet == 0) if (ref.decrementAndGet == 0)
result success None result tryComplete Right(None)
} }
futures.foreach(_ onComplete search) futures.foreach(_ onComplete search)
@ -279,13 +279,13 @@ object Future {
* The Delimited Continuations compiler plugin must be enabled in order to use this method. * The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/ */
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val future = Promise[A] val p = Promise[A]
dispatchTask({ () dispatchTask({ ()
(reify(body) foreachFull (future success, future failure): Future[Any]) onFailure { (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure {
case e: Exception future failure e case NonFatal(e) p tryComplete Left(e)
} }
}, true) }, true)
future p.future
} }
/** /**
@ -379,7 +379,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] {
case Left(t) p failure t case Left(t) p failure t
case Right(r) that onSuccess { case r2 p success ((r, r2)) } case Right(r) that onSuccess { case r2 p success ((r, r2)) }
} }
that onFailure { case f p failure f } that onFailure { case f p tryComplete Left(f) }
p.future p.future
} }
@ -411,7 +411,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] {
* callbacks may be registered; there is no guarantee that they will be * callbacks may be registered; there is no guarantee that they will be
* executed in a particular order. * executed in a particular order.
*/ */
def onComplete(func: Either[Throwable, T] Unit): this.type def onComplete[U](func: Either[Throwable, T] U): this.type
/** /**
* When the future is completed with a valid result, apply the provided * When the future is completed with a valid result, apply the provided
@ -483,7 +483,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] {
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val p = Promise[A]() val p = Promise[A]()
onComplete { onComplete {
case Left(e) if pf isDefinedAt e p.complete(try { Right(pf(e)) } catch { case x: Exception Left(x) }) case Left(e) if pf isDefinedAt e p.complete(try { Right(pf(e)) } catch { case NonFatal(x) Left(x) })
case otherwise p complete otherwise case otherwise p complete otherwise
} }
p.future p.future
@ -699,9 +699,12 @@ trait Promise[T] extends Future[T] {
/** /**
* Completes this Promise with the specified result, if not already completed. * Completes this Promise with the specified result, if not already completed.
* @throws IllegalStateException if already completed, this is to aid in debugging of complete-races,
* use tryComplete to do a conditional complete.
* @return this * @return this
*/ */
final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this } final def complete(value: Either[Throwable, T]): this.type =
if (tryComplete(value)) this else throw new IllegalStateException("Promise already completed: " + this + " tried to complete with " + value)
/** /**
* Completes this Promise with the specified result, if not already completed. * Completes this Promise with the specified result, if not already completed.
@ -721,7 +724,7 @@ trait Promise[T] extends Future[T] {
* @return this. * @return this.
*/ */
final def completeWith(other: Future[T]): this.type = { final def completeWith(other: Future[T]): this.type = {
other onComplete { complete(_) } other onComplete { tryComplete(_) }
this this
} }
@ -840,7 +843,7 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
} }
} }
def onComplete(func: Either[Throwable, T] Unit): this.type = { def onComplete[U](func: Either[Throwable, T] U): this.type = {
@tailrec //Returns whether the future has already been completed or not @tailrec //Returns whether the future has already been completed or not
def tryAddCallback(): Either[Throwable, T] = { def tryAddCallback(): Either[Throwable, T] = {
val cur = getState val cur = getState
@ -858,9 +861,8 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
} }
} }
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) { private final def notifyCompleted[U](func: Either[Throwable, T] U, result: Either[Throwable, T]): Unit =
try { func(result) } catch { case NonFatal(e) executor.reportFailure(e) } try func(result) catch { case NonFatal(e) executor reportFailure e }
}
} }
/** /**
@ -871,7 +873,7 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe
val value = Some(resolve(suppliedValue)) val value = Some(resolve(suppliedValue))
def tryComplete(value: Either[Throwable, T]): Boolean = false def tryComplete(value: Either[Throwable, T]): Boolean = false
def onComplete(func: Either[Throwable, T] Unit): this.type = { def onComplete[U](func: Either[Throwable, T] U): this.type = {
val completedAs = value.get val completedAs = value.get
Future dispatchTask (() func(completedAs)) Future dispatchTask (() func(completedAs))
this this
@ -982,7 +984,7 @@ abstract class Recover[+T] extends japi.RecoverBridge[T] {
* This method will be invoked once when/if the Future this recover callback is registered on * This method will be invoked once when/if the Future this recover callback is registered on
* becomes completed with a failure. * becomes completed with a failure.
* *
* @returns a successful value for the passed in failure * @return a successful value for the passed in failure
* @throws the passed in failure to propagate it. * @throws the passed in failure to propagate it.
* *
* Java API * Java API
@ -1005,7 +1007,7 @@ abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] {
* This method will be invoked once when/if a Future that this callback is registered on * This method will be invoked once when/if a Future that this callback is registered on
* becomes completed with a success. * becomes completed with a success.
* *
* @returns true if the successful value should be propagated to the new Future or not * @return true if the successful value should be propagated to the new Future or not
*/ */
def filter(result: T): Boolean def filter(result: T): Boolean
} }

View file

@ -333,14 +333,14 @@ trait MailboxType {
* It's a case class for Java (new UnboundedMailbox) * It's a case class for Java (new UnboundedMailbox)
*/ */
case class UnboundedMailbox() extends MailboxType { case class UnboundedMailbox() extends MailboxType {
override def create(receiver: ActorContext): Mailbox = final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]() final val queue = new ConcurrentLinkedQueue[Envelope]()
} }
} }
case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType { case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType {
override def create(receiver: ActorContext): Mailbox = final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope]() final val queue = new LinkedBlockingDeque[Envelope]()
} }
@ -351,7 +351,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorContext) = final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue[Envelope](capacity) final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut final val pushTimeOut = BoundedMailbox.this.pushTimeOut
@ -359,7 +359,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
} }
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
override def create(receiver: ActorContext) = final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp) final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
} }
@ -370,7 +370,7 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorContext) = final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut

View file

@ -8,14 +8,21 @@ import akka.AkkaException
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import scala.util.DynamicVariable import scala.util.DynamicVariable
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging import akka.event.Logging
import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException
case class NoSerializerFoundException(m: String) extends AkkaException(m) case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization { object Serialization {
/**
* Tuple that represents mapping from Class to Serializer
*/
type ClassSerializer = (Class[_], Serializer)
/** /**
* This holds a reference to the current ActorSystem (the surrounding context) * This holds a reference to the current ActorSystem (the surrounding context)
* during serialization and deserialization. * during serialization and deserialization.
@ -40,28 +47,19 @@ object Serialization {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import config._ import config._
val Serializers: Map[String, String] = val Serializers: Map[String, String] = configToMap(getConfig("akka.actor.serializers"))
getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
val SerializationBindings: Map[String, Seq[String]] = { val SerializationBindings: Map[String, String] = configToMap(getConfig("akka.actor.serialization-bindings"))
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match { private def configToMap(cfg: Config): Map[String, String] =
case false Map() cfg.root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
case true
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).root.unwrapped.asScala.toMap.map {
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
}
}
} }
} }
/** /**
* Serialization module. Contains methods for serialization and deserialization as well as * Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. * locating a Serializer for a particular class as defined in the mapping in the configuration.
*/ */
class Serialization(val system: ExtendedActorSystem) extends Extension { class Serialization(val system: ExtendedActorSystem) extends Extension {
import Serialization._ import Serialization._
@ -105,8 +103,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
} catch { case e: Exception Left(e) } } catch { case e: Exception Left(e) }
/** /**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null, * Returns the Serializer configured for the given object, returns the NullSerializer if it's null.
* falls back to the Serializer named "default" *
* @throws akka.config.ConfigurationException if no `serialization-bindings` is configured for the
* class of the object
*/ */
def findSerializerFor(o: AnyRef): Serializer = o match { def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer case null NullSerializer
@ -114,82 +114,94 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
} }
/** /**
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default". * Returns the configured Serializer for the given Class. The configured Serializer
* It traverses interfaces and super classes to find any configured Serializer that match * is used if the configured class `isAssignableFrom` from the `clazz`, i.e.
* the class name. * the configured class is a super class or implemented interface. In case of
* ambiguity it is primarily using the most specific configured class,
* and secondly the entry configured first.
*
* @throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class
*/ */
def serializerFor(clazz: Class[_]): Serializer = def serializerFor(clazz: Class[_]): Serializer =
if (bindings.isEmpty) { serializerMap.get(clazz) match {
// quick path to default when no bindings are registered case null
serializers("default") // bindings are ordered from most specific to least specific
} else { def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean =
possibilities.size == 1 ||
(possibilities map (_._1) forall (_ isAssignableFrom possibilities(0)._1)) ||
(possibilities map (_._2) forall (_ == possibilities(0)._2))
def resolve(c: Class[_]): Option[Serializer] = val ser = bindings filter { _._1 isAssignableFrom clazz } match {
serializerMap.get(c.getName) match { case Seq()
case null throw new NotSerializableException("No configured serialization-bindings for class [%s]" format clazz.getName)
val classes = c.getInterfaces ++ Option(c.getSuperclass) case possibilities
classes.view map resolve collectFirst { case Some(x) x } if (!unique(possibilities))
case x Some(x) log.warning("Multiple serializers found for " + clazz + ", choosing first: " + possibilities)
possibilities(0)._2
} }
serializerMap.putIfAbsent(clazz, ser) match {
serializerMap.get(clazz.getName) match { case null
case null log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
val ser = resolve(clazz).getOrElse(serializers("default")) ser
// memorize the lookups for performance case some some
serializerMap.putIfAbsent(clazz.getName, ser) match { }
case null case ser ser
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case some some
}
case ser ser
}
} }
/** /**
* Tries to load the specified Serializer by the FQN * Tries to instantiate the specified Serializer by the FQN
*/ */
def serializerOf(serializerFQN: String): Either[Exception, Serializer] = def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs, system.internalClassLoader)
/** /**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config
*/ */
lazy val serializers: Map[String, Serializer] = { private val serializers: Map[String, Serializer] = {
val serializersConf = settings.Serializers for ((k: String, v: String) settings.Serializers)
for ((k: String, v: String) serializersConf)
yield k -> serializerOf(v).fold(throw _, identity) yield k -> serializerOf(v).fold(throw _, identity)
} }
/** /**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used * bindings is a Seq of tuple representing the mapping from Class to Serializer.
* It is primarily ordered by the most specific classes first, and secondly in the configured order.
*/ */
lazy val bindings: Map[String, String] = { private[akka] val bindings: Seq[ClassSerializer] = {
settings.SerializationBindings.foldLeft(Map[String, String]()) { val configuredBindings = for ((k: String, v: String) settings.SerializationBindings if v != "none") yield {
//All keys which are lists, take the Strings from them and Map them val c = ReflectiveAccess.getClassFor(k, system.internalClassLoader).fold(throw _, identity[Class[_]])
case (result, (k: String, vs: Seq[_])) result ++ (vs collect { case v: String (v, k) }) (c, serializers(v))
//For any other values, just skip them
case (result, _) result
} }
sort(configuredBindings)
} }
/** /**
* serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class * Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort).
*/ */
private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = { private def sort(in: Iterable[ClassSerializer]): Seq[ClassSerializer] =
val serializerMap = new ConcurrentHashMap[String, Serializer] (new ArrayBuffer[ClassSerializer](in.size) /: in) { (buf, ca)
for ((k, v) bindings) { buf.indexWhere(_._1 isAssignableFrom ca._1) match {
serializerMap.put(k, serializers(v)) case -1 buf append ca
case x buf insert (x, ca)
}
buf
} }
/**
* serializerMap is a Map whose keys is the class that is serializable and values is the serializer
* to be used for that class.
*/
private val serializerMap: ConcurrentHashMap[Class[_], Serializer] = {
val serializerMap = new ConcurrentHashMap[Class[_], Serializer]
for ((c, s) bindings) serializerMap.put(c, s)
serializerMap serializerMap
} }
/** /**
* Maps from a Serializer Identity (Int) to a Serializer instance (optimization) * Maps from a Serializer Identity (Int) to a Serializer instance (optimization)
*/ */
lazy val serializerByIdentity: Map[Int, Serializer] = val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) } Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
} }

View file

@ -5,8 +5,9 @@ package akka.util
/** /**
* Extractor of non-fatal Throwables. Will not match fatal errors * Extractor of non-fatal Throwables. Will not match fatal errors
* like VirtualMachineError (OutOfMemoryError, StackOverflowError) * like VirtualMachineError (OutOfMemoryError)
* ThreadDeath, and InterruptedException. * ThreadDeath, LinkageError and InterruptedException.
* StackOverflowError is matched, i.e. considered non-fatal.
* *
* Usage to catch all harmless throwables: * Usage to catch all harmless throwables:
* {{{ * {{{
@ -20,8 +21,9 @@ package akka.util
object NonFatal { object NonFatal {
def unapply(t: Throwable): Option[Throwable] = t match { def unapply(t: Throwable): Option[Throwable] = t match {
// VirtualMachineError includes OutOfMemoryError, StackOverflowError and other fatal errors case e: StackOverflowError Some(e) // StackOverflowError ok even though it is a VirtualMachineError
case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException None // VirtualMachineError includes OutOfMemoryError and other fatal errors
case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError None
case e Some(e) case e Some(e)
} }

View file

@ -10,11 +10,13 @@ akka {
# The dispatcher used for agent-send-off actor # The dispatcher used for agent-send-off actor
send-off-dispatcher { send-off-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
# The dispatcher used for agent-alter-off actor # The dispatcher used for agent-alter-off actor
alter-off-dispatcher { alter-off-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }

View file

@ -369,7 +369,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) {
/** /**
* Gossips to a random member in the set of members passed in as argument. * Gossips to a random member in the set of members passed in as argument.
* *
* @returns 'true' if it gossiped to a "seed" member. * @return 'true' if it gossiped to a "seed" member.
*/ */
private def gossipToRandomNodeOf(members: Set[Member]): Boolean = { private def gossipToRandomNodeOf(members: Set[Member]): Boolean = {
val peers = members filter (_.address != address) // filter out myself val peers = members filter (_.address != address) // filter out myself

View file

@ -54,61 +54,7 @@ public class SerializationDocTestBase {
} }
} }
//#my-own-serializer //#my-own-serializer
@Test public void haveExamples() {
/*
//#serialize-messages-config
akka {
actor {
serialize-messages = on
}
}
//#serialize-messages-config
//#serialize-creators-config
akka {
actor {
serialize-creators = on
}
}
//#serialize-creators-config
//#serialize-serializers-config
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
}
}
//#serialize-serializers-config
//#serialization-bindings-config
akka {
actor {
serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
serialization-bindings {
java = ["java.lang.String",
"app.my.Customer"]
proto = ["com.google.protobuf.Message"]
myown = ["my.own.BusinessObject",
"something.equally.Awesome",
"akka.docs.serialization.MyOwnSerializable"
"java.lang.Boolean"]
}
}
}
//#serialization-bindings-config
*/
}
@Test public void demonstrateTheProgrammaticAPI() { @Test public void demonstrateTheProgrammaticAPI() {
//#programmatic //#programmatic

View file

@ -55,6 +55,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
the values for the ``default-dispatcher`` in your configuration. the values for the ``default-dispatcher`` in your configuration.
There are two different executor services:
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
Note that the pool size is configured differently for the two executor services. The configuration above
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
Let's now walk through the different dispatchers in more detail. Let's now walk through the different dispatchers in more detail.
Thread-based Thread-based
@ -67,9 +78,11 @@ has worse performance and scalability than the event-based dispatcher but works
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other. this dispatcher is that Actors do not block threads for each other.
The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: The ``PinnedDispatcher`` is configured like this:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``.
Event-based Event-based
^^^^^^^^^^^ ^^^^^^^^^^^

View file

@ -26,7 +26,7 @@ to your ``application.conf`` file::
} }
remote { remote {
transport = "akka.remote.netty.NettyRemoteTransport" transport = "akka.remote.netty.NettyRemoteTransport"
server { netty {
hostname = "127.0.0.1" hostname = "127.0.0.1"
port = 2552 port = 2552
} }

View file

@ -25,47 +25,39 @@ For Akka to know which ``Serializer`` to use for what, you need edit your :ref:`
in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer`` in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer``
you wish to use, like this: you wish to use, like this:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-serializers-config .. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config
.. note::
The name ``default`` is special in the sense that the ``Serializer``
mapped to it will be used as default.
After you've bound names to different implementations of ``Serializer`` you need to wire which classes After you've bound names to different implementations of ``Serializer`` you need to wire which classes
should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialization-bindings-config .. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config
.. note:: You only need to specify the name of an interface or abstract base class of the
messages. In case of ambiguity, i.e. the message implements several of the
configured classes, the most specific configured class will be used, i.e. the
one of which all other candidates are superclasses. If this condition cannot be
met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply
and neither is a subtype of the other, a warning will be issued.
You only need to specify the name of an interface or abstract base class if the messages implements Akka provides serializers for :class:`java.io.Serializable` and `protobuf
that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. <http://code.google.com/p/protobuf/>`_
:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if
depending on the akka-remote module), so normally you don't need to add
configuration for that; since :class:`com.google.protobuf.GeneratedMessage`
implements :class:`java.io.Serializable`, protobuf messages will always by
serialized using the protobuf protocol unless specifically overridden. In order
to disable a default serializer, map its marker type to “none”::
Protobuf akka.actor.serialization-bindings {
-------- "java.io.Serializable" = none
}
Akka provides a ``Serializer`` for `protobuf <http://code.google.com/p/protobuf/>`_ messages.
To use that you need to add the following to the configuration::
akka {
actor {
serializers {
proto = "akka.serialization.ProtobufSerializer"
}
serialization-bindings {
proto = ["com.google.protobuf.Message"]
}
}
}
Verification Verification
------------ ------------
If you want to verify that your messages are serializable you can enable the following config option: If you want to verify that your messages are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-messages-config .. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-messages-config
.. warning:: .. warning::
@ -74,7 +66,7 @@ If you want to verify that your messages are serializable you can enable the fol
If you want to verify that your ``Props`` are serializable you can enable the following config option: If you want to verify that your ``Props`` are serializable you can enable the following config option:
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-creators-config .. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-creators-config
.. warning:: .. warning::
@ -110,4 +102,4 @@ which is done by extending ``akka.serialization.JSerializer``, like this:
:exclude: ... :exclude: ...
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it. list which classes that should be serialized using it.

View file

@ -20,6 +20,27 @@ object DispatcherDocSpec {
val config = """ val config = """
//#my-dispatcher-config //#my-dispatcher-config
my-dispatcher { my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the number of messages that are processed in a batch before the
# thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 100
}
//#my-dispatcher-config
//#my-thread-pool-dispatcher-config
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher # Dispatcher is the name of the event-based dispatcher
type = Dispatcher type = Dispatcher
# What kind of ExecutionService to use # What kind of ExecutionService to use
@ -37,7 +58,14 @@ object DispatcherDocSpec {
# thread is returned to the pool. Set to 1 for as fair as possible. # thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 100 throughput = 100
} }
//#my-dispatcher-config //#my-thread-pool-dispatcher-config
//#my-pinned-dispatcher-config
my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
//#my-pinned-dispatcher-config
//#my-bounded-config //#my-bounded-config
my-dispatcher-bounded-queue { my-dispatcher-bounded-queue {

View file

@ -45,6 +45,9 @@ class MyOwnSerializer extends Serializer {
} }
//#my-own-serializer //#my-own-serializer
trait MyOwnSerializable
case class Customer(name: String) extends MyOwnSerializable
class SerializationDocSpec extends AkkaSpec { class SerializationDocSpec extends AkkaSpec {
"demonstrate configuration of serialize messages" in { "demonstrate configuration of serialize messages" in {
//#serialize-messages-config //#serialize-messages-config
@ -82,8 +85,8 @@ class SerializationDocSpec extends AkkaSpec {
akka { akka {
actor { actor {
serializers { serializers {
default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
myown = "akka.docs.serialization.MyOwnSerializer" myown = "akka.docs.serialization.MyOwnSerializer"
} }
} }
@ -91,8 +94,6 @@ class SerializationDocSpec extends AkkaSpec {
""") """)
//#serialize-serializers-config //#serialize-serializers-config
val a = ActorSystem("system", config) val a = ActorSystem("system", config)
SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer")
a.shutdown() a.shutdown()
} }
@ -102,31 +103,26 @@ class SerializationDocSpec extends AkkaSpec {
akka { akka {
actor { actor {
serializers { serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer" proto = "akka.serialization.ProtobufSerializer"
myown = "akka.docs.serialization.MyOwnSerializer" myown = "akka.docs.serialization.MyOwnSerializer"
} }
serialization-bindings { serialization-bindings {
java = ["java.lang.String", "java.lang.String" = java
"app.my.Customer"] "akka.docs.serialization.Customer" = java
proto = ["com.google.protobuf.Message"] "com.google.protobuf.Message" = proto
myown = ["my.own.BusinessObject", "akka.docs.serialization.MyOwnSerializable" = myown
"something.equally.Awesome", "java.lang.Boolean" = myown
"akka.docs.serialization.MyOwnSerializable" }
"java.lang.Boolean"]
}
} }
} }
""") """)
//#serialization-bindings-config //#serialization-bindings-config
val a = ActorSystem("system", config) val a = ActorSystem("system", config)
SerializationExtension(a).serializers("default").getClass.getName must equal("akka.serialization.JavaSerializer") SerializationExtension(a).serializerFor(classOf[String]).getClass must equal(classOf[JavaSerializer])
SerializationExtension(a).serializers("java").getClass.getName must equal("akka.serialization.JavaSerializer") SerializationExtension(a).serializerFor(classOf[Customer]).getClass must equal(classOf[JavaSerializer])
SerializationExtension(a).serializers("myown").getClass.getName must equal("akka.docs.serialization.MyOwnSerializer") SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass must equal(classOf[MyOwnSerializer])
SerializationExtension(a).serializerFor(classOf[String]).getClass.getName must equal("akka.serialization.JavaSerializer")
SerializationExtension(a).serializerFor(classOf[java.lang.Boolean]).getClass.getName must equal("akka.docs.serialization.MyOwnSerializer")
a.shutdown() a.shutdown()
} }

View file

@ -54,6 +54,17 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override :ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
the values for the ``default-dispatcher`` in your configuration. the values for the ``default-dispatcher`` in your configuration.
There are two different executor services:
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
Note that the pool size is configured differently for the two executor services. The configuration above
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
Let's now walk through the different dispatchers in more detail. Let's now walk through the different dispatchers in more detail.
Thread-based Thread-based
@ -66,9 +77,11 @@ has worse performance and scalability than the event-based dispatcher but works
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other. this dispatcher is that Actors do not block threads for each other.
The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this: The ``PinnedDispatcher`` is configured like this:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher .. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``.
Event-based Event-based
^^^^^^^^^^^ ^^^^^^^^^^^

View file

@ -27,7 +27,7 @@ to your ``application.conf`` file::
} }
remote { remote {
transport = "akka.remote.netty.NettyRemoteTransport" transport = "akka.remote.netty.NettyRemoteTransport"
server { netty {
hostname = "127.0.0.1" hostname = "127.0.0.1"
port = 2552 port = 2552
} }

View file

@ -27,38 +27,30 @@ you wish to use, like this:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config
.. note::
The name ``default`` is special in the sense that the ``Serializer``
mapped to it will be used as default.
After you've bound names to different implementations of ``Serializer`` you need to wire which classes After you've bound names to different implementations of ``Serializer`` you need to wire which classes
should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section: should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section:
.. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config .. includecode:: code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config
.. note:: You only need to specify the name of an interface or abstract base class of the
messages. In case of ambiguity, i.e. the message implements several of the
configured classes, the most specific configured class will be used, i.e. the
one of which all other candidates are superclasses. If this condition cannot be
met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply
and neither is a subtype of the other, a warning will be issued
You only need to specify the name of an interface or abstract base class if the messages implements Akka provides serializers for :class:`java.io.Serializable` and `protobuf
that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. <http://code.google.com/p/protobuf/>`_
:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if
depending on the akka-remote module), so normally you don't need to add
configuration for that; since :class:`com.google.protobuf.GeneratedMessage`
implements :class:`java.io.Serializable`, protobuf messages will always by
serialized using the protobuf protocol unless specifically overridden. In order
to disable a default serializer, map its marker type to “none”::
Protobuf akka.actor.serialization-bindings {
-------- "java.io.Serializable" = none
}
Akka provides a ``Serializer`` for `protobuf <http://code.google.com/p/protobuf/>`_ messages.
To use that you need to add the following to the configuration::
akka {
actor {
serializers {
proto = "akka.serialization.ProtobufSerializer"
}
serialization-bindings {
proto = ["com.google.protobuf.Message"]
}
}
}
Verification Verification
------------ ------------
@ -108,4 +100,4 @@ First you need to create a class definition of your ``Serializer`` like so:
:exclude: ... :exclude: ...
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it. list which classes that should be serialized using it.

View file

@ -1,2 +1,3 @@
# In this file you can override any option defined in the 'reference.conf' files. # In this file you can override any option defined in the 'reference.conf' files.
# Copy in all or parts of the 'reference.conf' files and modify as you please. # Copy in all or parts of the 'reference.conf' files and modify as you please.
# For more info about config, please visit the Akka Documentation: http://akka.io/docs/akka/2.0-SNAPSHOT/

View file

@ -5,10 +5,24 @@
# This the reference config file has all the default settings. # This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf. # Make your edits/overrides in your application.conf.
# comments above akka.actor settings left out where they are already in akka-
# actor.jar, because otherwise they would be repeated in config rendering.
akka { akka {
actor { actor {
serializers {
proto = "akka.serialization.ProtobufSerializer"
}
serialization-bindings {
# Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage
# does, need to use the more specific one here in order to avoid ambiguity
"com.google.protobuf.GeneratedMessage" = proto
}
deployment { deployment {
default { default {
@ -133,6 +147,7 @@ akka {
# The dispatcher used for the system actor "network-event-sender" # The dispatcher used for the system actor "network-event-sender"
network-event-sender-dispatcher { network-event-sender-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher type = PinnedDispatcher
} }
} }

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.testkit.AkkaSpec
import akka.remote.RemoteProtocol.MessageProtocol
import akka.actor.ProtobufProtocol.MyMessage
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ProtobufSerializerSpec extends AkkaSpec {
val ser = SerializationExtension(system)
"Serialization" must {
"resolve protobuf serializer" in {
ser.serializerFor(classOf[MessageProtocol]).getClass must be(classOf[ProtobufSerializer])
ser.serializerFor(classOf[MyMessage]).getClass must be(classOf[ProtobufSerializer])
}
}
}

View file

@ -15,6 +15,7 @@ akka {
socket-dispatcher { socket-dispatcher {
# A zeromq socket needs to be pinned to the thread that created it. # A zeromq socket needs to be pinned to the thread that created it.
# Changing this value results in weird errors and race conditions within zeromq # Changing this value results in weird errors and race conditions within zeromq
executor = thread-pool-executor
type = "PinnedDispatcher" type = "PinnedDispatcher"
} }
} }