diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 2d7534c755..7168daa265 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -182,7 +182,7 @@ object FSMTimingSpec { when(TestCancelTimer) { case Ev(Tick) ⇒ setTimer("hallo", Tock, 1 milli, false) - TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) @@ -209,7 +209,7 @@ object FSMTimingSpec { case Ev(Tick) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) - TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) stay forMax (1 millis) replying Tick case Ev(Tock) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 15bf70b2e6..4fbb67fbb4 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -64,9 +64,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = Promise[String]().complete(Left(new RuntimeException(message))) behave like futureWithException[RuntimeException](_(future, message)) } - "completed with a j.u.c.TimeoutException" must { - val message = "Boxed TimeoutException" - val future = Promise[String]().complete(Left(new TimeoutException(message))) + "completed with an InterruptedException" must { + val message = "Boxed InterruptedException" + val future = Promise[String]().complete(Left(new InterruptedException(message))) behave like futureWithException[RuntimeException](_(future, message)) } "completed with a NonLocalReturnControl" must { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 534c4a757d..b797827680 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -75,7 +75,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn result } - def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters) + def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { q must not be null diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index cceb608452..f90f651065 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -10,6 +10,9 @@ import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory import akka.actor._ import java.io._ +import akka.dispatch.Await +import akka.util.Timeout +import akka.util.duration._ object SerializeSpec { @@ -129,3 +132,67 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { } } } + +object VerifySerializabilitySpec { + val conf = ConfigFactory.parseString(""" + akka { + actor { + serialize-messages = on + + serialize-creators = on + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.testing.ProtobufSerializer" + sjson = "akka.testing.SJSONSerializer" + default = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] + sjson = ["akka.serialization.SerializeSpec$Person"] + proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] + } + } + } + """) + + class FooActor extends Actor { + def receive = { + case s: String ⇒ sender ! s + } + } + + class NonSerializableActor(system: ActorSystem) extends Actor { + def receive = { + case s: String ⇒ sender ! s + } + } +} + +class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { + import VerifySerializabilitySpec._ + implicit val timeout = Timeout(5 seconds) + + "verify config" in { + system.settings.SerializeAllCreators must be(true) + system.settings.SerializeAllMessages must be(true) + } + + "verify creators" in { + val a = system.actorOf(Props[FooActor]) + intercept[NotSerializableException] { + Await.result(a ? new AnyRef, timeout.duration) + } + system stop a + } + + "verify messages" in { + val a = system.actorOf(Props[FooActor]) + Await.result(a ? "pigdog", timeout.duration) must be("pigdog") + intercept[java.io.NotSerializableException] { + val b = system.actorOf(Props(new NonSerializableActor(system))) + } + system stop a + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 42993515e8..6dd2b2a452 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -50,10 +50,14 @@ akka { # - TypedActor: methods with non-void return type timeout = 5s - # Does a deep clone of (non-primitive) messages to ensure immutability - # FIXME: not used, make use of it or remove the option + # Serializes and deserializes (non-primitive) messages to ensure immutability, + # this is only intended for testing. serialize-messages = off + # Serializes and deserializes creators (in Props) to ensure that they can be sent over the network, + # this is only intended for testing. + serialize-creators = off + deployment { # deployment id pattern - on the format: /parent/child etc. diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2c44ec70c6..8713df95b4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -25,8 +25,17 @@ import java.util.regex.Pattern */ trait AutoReceivedMessage extends Serializable +/** + * Marker trait to indicate that a message might be potentially harmful, + * this is used to block messages coming in over remoting. + */ trait PossiblyHarmful +/** + * Marker trait to signal that this class should not be verified for serializability. + */ +trait NoSerializationVerificationNeeded + case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6a532136b4..69b8c0b6c9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,6 +13,7 @@ import akka.event.Logging.{ Debug, Warning, Error } import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } +import akka.serialization.SerializationExtension //TODO: everything here for current compatibility - could be limited more @@ -214,6 +215,16 @@ private[akka] class ActorCell( final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs private def _actorOf(props: Props, name: String): ActorRef = { + if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { + val ser = SerializationExtension(system) + ser.serialize(props.creator) match { + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass, None) match { + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None) childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) actor @@ -308,7 +319,7 @@ private[akka] class ActorCell( } final def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system)) final def sender: ActorRef = currentMessage match { case null ⇒ system.deadLetters @@ -566,7 +577,7 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { + if (recvtimeout._1 > 0 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 399011fd7a..a36c5e8973 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -110,6 +110,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act } final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { + if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name) def address: Address = root.address diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c9da4d5ae7..ee53fec688 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,7 +13,7 @@ import java.util.concurrent.TimeUnit import akka.event.EventStream import akka.event.DeathWatch import scala.annotation.tailrec -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ ConcurrentHashMap, TimeoutException } import akka.event.LoggingAdapter import java.util.concurrent.atomic.AtomicBoolean @@ -106,6 +106,8 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * Akka Java API. * * Sends a message asynchronously returns a future holding the eventual reply message. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. * * NOTE: * Use this method with care. In most cases it is better to use 'tell' together with the sender @@ -177,6 +179,9 @@ trait ScalaActorRef { ref: ActorRef ⇒ /** * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. + * * NOTE: * Use this method with care. In most cases it is better to use '!' together with implicit or explicit * sender parameter to implement non-blocking request/response message exchanges. @@ -489,6 +494,14 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal } } +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + class AskActorRef( val path: ActorPath, override val getParent: InternalActorRef, diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 115fca87d5..57ef9f108c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -6,14 +6,12 @@ package akka.actor import java.util.concurrent.atomic.AtomicLong import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } -import akka.util.Timeout import akka.util.Timeout.intToTimeout import akka.config.ConfigurationException import akka.dispatch._ import akka.routing._ -import akka.util.Timeout import akka.AkkaException -import akka.util.{ Duration, Switch, Helpers } +import akka.util.{ Duration, Switch, Helpers, Timeout } import akka.event._ import java.io.Closeable @@ -485,15 +483,15 @@ class LocalActorRefProvider( def ask(within: Timeout): Option[AskActorRef] = { (if (within == null) settings.ActorTimeout else within) match { - case t if t.duration.length <= 0 ⇒ - None + case t if t.duration.length <= 0 ⇒ None case t ⇒ val path = tempPath() val name = path.name val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch) tempContainer.addChild(name, a) - val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.removeChild(name); a.stop() } - a.result onComplete { _ ⇒ + val result = a.result + val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } + result onComplete { _ ⇒ try { a.stop(); f.cancel() } finally { tempContainer.removeChild(name) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8c9a3bd5b9..dbe5630789 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -58,9 +58,9 @@ object ActorSystem { def create(): ActorSystem = apply() def apply(): ActorSystem = apply("default") - class Settings(cfg: Config, val name: String) { + class Settings(cfg: Config, final val name: String) { - val config: Config = { + final val config: Config = { val config = cfg.withFallback(ConfigFactory.defaultReference) config.checkValid(ConfigFactory.defaultReference, "akka") config @@ -69,81 +69,39 @@ object ActorSystem { import scala.collection.JavaConverters._ import config._ - val ConfigVersion = getString("akka.version") + final val ConfigVersion = getString("akka.version") - val ProviderClass = getString("akka.actor.provider") + final val ProviderClass = getString("akka.actor.provider") - val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) - val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) - val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) - val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") + final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) + final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) + final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) + final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") + final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators") - val LogLevel = getString("akka.loglevel") - val StdoutLogLevel = getString("akka.stdout-loglevel") - val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala - val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart") - val AddLoggingReceive = getBoolean("akka.actor.debug.receive") - val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") - val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") - val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") - val DebugEventStream = getBoolean("akka.actor.debug.event-stream") + final val LogLevel = getString("akka.loglevel") + final val StdoutLogLevel = getString("akka.stdout-loglevel") + final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart") + final val AddLoggingReceive = getBoolean("akka.actor.debug.receive") + final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") + final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") + final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") + final val DebugEventStream = getBoolean("akka.actor.debug.event-stream") - val Home = config.getString("akka.home") match { + final val Home = config.getString("akka.home") match { case "" ⇒ None case x ⇒ Some(x) } - val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) - val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") + final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) + final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") override def toString: String = config.root.render } - - // TODO move to migration kit - object OldConfigurationLoader { - - val defaultConfig: Config = { - val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig - cfg.withFallback(ConfigFactory.defaultReference).resolve(ConfigResolveOptions.defaults) - } - - // file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax - val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka") - - private def envMode = System.getenv("AKKA_MODE") match { - case null | "" ⇒ None - case value ⇒ Some(value) - } - - private def systemMode = System.getProperty("akka.mode") match { - case null | "" ⇒ None - case value ⇒ Some(value) - } - - private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false) - - private def fromProperties = try { - val property = Option(System.getProperty("akka.config")) - property.map(p ⇒ - ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions))) - } catch { case _ ⇒ None } - - private def fromClasspath = try { - Option(ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions))) - } catch { case _ ⇒ None } - - private def fromHome = try { - Option(ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseFileAnySyntax(new File(GlobalHome.get + "/config/" + defaultLocation), configParseOptions))) - } catch { case _ ⇒ None } - - private def emptyConfig = ConfigFactory.systemProperties - } } /** @@ -323,7 +281,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor import ActorSystem._ - val settings = new Settings(applicationConfig, name) + final val settings = new Settings(applicationConfig, name) def logConfiguration(): Unit = log.info(settings.toString) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index e4e2ee856a..afdd683419 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -46,15 +46,53 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = object FaultHandlingStrategy { sealed trait Action + + /** + * Resumes message processing for the failed Actor + */ case object Resume extends Action + + /** + * Discards the old Actor instance and replaces it with a new, + * then resumes message processing. + */ case object Restart extends Action + + /** + * Stops the Actor + */ case object Stop extends Action + + /** + * Escalates the failure to the supervisor of the supervisor, + * by rethrowing the cause of the failure. + */ case object Escalate extends Action - // Java API + /** + * Resumes message processing for the failed Actor + * Java API + */ def resume = Resume + + /** + * Discards the old Actor instance and replaces it with a new, + * then resumes message processing. + * Java API + */ def restart = Restart + + /** + * Stops the Actor + * Java API + */ def stop = Stop + + /** + * Escalates the failure to the supervisor of the supervisor, + * by rethrowing the cause of the failure. + * Java API + */ def escalate = Escalate type Decider = PartialFunction[Throwable, Action] diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 905d2d6498..24b9da4447 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -6,21 +6,30 @@ package akka.dispatch import java.util.concurrent._ import akka.event.Logging.Error -import akka.util.{ Duration, Switch, ReentrantGuard } -import atomic.{ AtomicInteger, AtomicLong } -import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } +import akka.util.Duration import akka.actor._ import akka.actor.ActorSystem -import locks.ReentrantLock import scala.annotation.tailrec import akka.event.EventStream -import akka.actor.ActorSystem.Settings import com.typesafe.config.Config -import java.util.concurrent.atomic.AtomicReference import akka.util.ReflectiveAccess +import akka.serialization.SerializationExtension -final case class Envelope(val message: Any, val sender: ActorRef) { - if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") +final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { + if (message.isInstanceOf[AnyRef]) { + val msg = message.asInstanceOf[AnyRef] + if (msg eq null) throw new InvalidMessageException("Message is null") + if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { + val ser = SerializationExtension(system) + ser.serialize(msg) match { //Verify serializability + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } + } } object SystemMessage { @@ -103,7 +112,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def name: String /** - * Identfier of this dispatcher, corresponds to the full key + * Identifier of this dispatcher, corresponds to the full key * of the dispatcher configuration. */ def id: String @@ -257,16 +266,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext * Must be idempotent */ protected[akka] def shutdown(): Unit - - /** - * Returns the size of the mailbox for the specified actor - */ - def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages - - /** - * Returns the "current" emptiness status of the mailbox for the specified actor - */ - def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2906cd7b08..59996e9311 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -89,7 +89,7 @@ object Futures { /** * Java API - * A non-blocking fold over the specified futures. + * A non-blocking fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. @@ -201,13 +201,13 @@ object Future { } /** - * A non-blocking fold over the specified futures. + * A non-blocking fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. * Example: *
- * val result = Await.result(Futures.fold(0)(futures)(_ + _), 5 seconds)
+ * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
*
*/
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = {
@@ -231,7 +231,7 @@ object Future {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
*
- * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
+ * val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x)))
*
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
@@ -337,7 +337,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X])
- case Left(t: TimeoutException) ⇒ Left(new RuntimeException("Boxed TimeoutException", t))
+ case Left(t: InterruptedException) ⇒ Left(new RuntimeException("Boxed InterruptedException", t))
case _ ⇒ source
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index 22dfd15ffa..3a5733f9ef 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -111,10 +111,16 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(corePoolSize = size))
+ if (config.maxPoolSize < size)
+ this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
+ else
+ this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(maxPoolSize = size))
+ if (config.corePoolSize > size)
+ this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
+ else
+ this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
@@ -206,6 +212,11 @@ trait ExecutorServiceDelegate extends ExecutorService {
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
+/**
+ * The RejectedExecutionHandler used by Akka, it improves on CallerRunsPolicy
+ * by throwing a RejectedExecutionException if the executor isShutdown.
+ * (CallerRunsPolicy silently discards the runnable in this case, which is arguably broken)
+ */
class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = {
if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown")
diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala
index 1797eb9d18..fbceb6973d 100644
--- a/akka-actor/src/main/scala/akka/event/Logging.scala
+++ b/akka-actor/src/main/scala/akka/event/Logging.scala
@@ -363,7 +363,7 @@ object Logging {
* message. This is necessary to ensure that additional subscriptions are in
* effect when the logging system finished starting.
*/
- case class InitializeLogger(bus: LoggingBus)
+ case class InitializeLogger(bus: LoggingBus) extends NoSerializationVerificationNeeded
/**
* Response message each logger must send within 1 second after receiving the
diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala
index 138be5e902..988820cf18 100644
--- a/akka-actor/src/main/scala/akka/routing/Pool.scala
+++ b/akka-actor/src/main/scala/akka/routing/Pool.scala
@@ -165,7 +165,7 @@ trait SmallestMailboxSelector {
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
def mailboxSize(a: ActorRef): Int = a match {
- case l: LocalActorRef ⇒ l.underlying.dispatcher.mailboxSize(l.underlying)
+ case l: LocalActorRef ⇒ l.underlying.mailbox.numberOfMessages
case _ ⇒ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
}
@@ -282,7 +282,7 @@ trait MailboxPressureCapacitor {
def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int =
delegates count {
- case a: LocalActorRef ⇒ a.underlying.dispatcher.mailboxSize(a.underlying) > pressureThreshold
+ case a: LocalActorRef ⇒ a.underlying.mailbox.numberOfMessages > pressureThreshold
case _ ⇒ false
}
}
diff --git a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java
index 5067b7a51b..1998c45a76 100644
--- a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java
+++ b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java
@@ -34,7 +34,7 @@ public class SchedulerDocTestBase {
@Before
public void setUp() {
system = ActorSystem.create("MySystem", AkkaSpec.testConf());
- testActor = system.actorOf(new Props().withCreator(MyUntypedActor.class));
+ testActor = system.actorOf(new Props(MyUntypedActor.class));
}
@After
diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
index 16311ca57b..0985861b00 100644
--- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
+++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java
@@ -100,7 +100,7 @@ public class UntypedActorDocTestBase {
public void propsActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-props
- ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
+ ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor");
//#creating-props
myActor.tell("test");
diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java
index fc76c36a14..8b2006edd8 100644
--- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java
+++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java
@@ -57,9 +57,9 @@ public class DispatcherDocTestBase {
@Test
public void defineDispatcher() {
//#defining-dispatcher
- ActorRef myActor1 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
+ ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor1");
- ActorRef myActor2 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
+ ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor2");
//#defining-dispatcher
}
@@ -68,7 +68,7 @@ public class DispatcherDocTestBase {
public void definePinnedDispatcher() {
//#defining-pinned-dispatcher
String name = "myactor";
- ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class)
+ ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
.withDispatcher("myactor-dispatcher"), name);
//#defining-pinned-dispatcher
}
diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst
index f60699b59c..ddd142ddbd 100644
--- a/akka-docs/java/untyped-actors.rst
+++ b/akka-docs/java/untyped-actors.rst
@@ -317,8 +317,9 @@ If invoked without the sender parameter the sender will be
Ask: Send-And-Receive-Future
----------------------------
-Using ``ask`` will send a message to the receiving Actor asynchronously and
-will immediately return a :class:`Future`:
+Using ``?`` will send a message to the receiving Actor asynchronously and
+will immediately return a :class:`Future` which will be completed with
+an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: java
diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst
index 524e536101..8393b7c2b8 100644
--- a/akka-docs/scala/actors.rst
+++ b/akka-docs/scala/actors.rst
@@ -353,7 +353,8 @@ Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
-will immediately return a :class:`Future`:
+will immediately return a :class:`Future` which will be completed with
+an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: scala
diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst
index 4b2318c58d..9f9e8594f7 100644
--- a/akka-docs/scala/remoting.rst
+++ b/akka-docs/scala/remoting.rst
@@ -17,69 +17,97 @@ In you SBT project you should add the following as a dependency::
"com.typesafe.akka" % "akka-remote" % "2.0-SNAPSHOT"
-First of all you have to change the actor provider from ``LocalActorRefProvider`` to ``RemoteActorRefProvider``::
+To enable remote capabilities in your Akka project you should, at a minimum, add the following changes
+to your ``application.conf`` file::
akka {
actor {
- provider = "akka.remote.RemoteActorRefProvider"
+ provider = "akka.remote.RemoteActorRefProvider"
}
- }
-
-After that you must also add the following settings::
-
- akka {
remote {
+ transport = "akka.remote.netty.NettyRemoteSupport"
server {
- # The hostname or ip to bind the remoting to,
- # InetAddress.getLocalHost.getHostAddress is used if empty
- hostname = ""
-
- # The default remote server port clients should connect to.
- # Default is 2552 (AKKA)
+ hostname = "127.0.0.1"
port = 2552
}
- }
+ }
}
-These are the bare minimal settings that must exist in order to get started with remoting.
-There are, of course, more properties that can be tweaked. We refer to the following
+As you can see in the example above there are four things you need to add to get started:
+
+* Change provider from ``akka.actor.LocalActorRefProvider`` to ``akka.remote.RemoteActorRefProvider``
+* Add host name - the machine you want to run the actor system on
+* Add port number - the port the actor system should listen on
+
+The example above only illustrates the bare minimum of properties you have to add to enable remoting.
+There are lots of more properties that are related to remoting in Akka. We refer to the following
reference file for more information:
* `reference.conf of akka-remote