Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Peter Veentjer 2011-07-20 11:20:58 +03:00
commit fee47cd6ac
35 changed files with 578 additions and 180 deletions

View file

@ -334,7 +334,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable) = latch.countDown()
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import Actor.actorOf
import akka.testkit._
import akka.util.duration._
import akka.config.Supervision.OneForOneStrategy
import java.util.concurrent.atomic._
object ActorRestartSpec {
private var _gen = new AtomicInteger(0)
def generation = _gen.incrementAndGet
def generation_=(x: Int) { _gen.set(x) }
sealed trait RestartType
case object Normal extends RestartType
case object Nested extends RestartType
case object Handover extends RestartType
case object Fail extends RestartType
class Restarter(val testActor: ActorRef) extends Actor {
val gen = generation
var xx = 0
var restart: RestartType = Normal
def receive = {
case x: Int xx = x
case t: RestartType restart = t
case "get" self reply xx
}
override def preStart { testActor ! (("preStart", gen)) }
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) }
override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) }
override def freshInstance() = {
restart match {
case Normal None
case Nested
val ref = TestActorRef(new Actor {
def receive = { case _ }
override def preStart { testActor ! ((this, self)) }
}).start()
testActor ! ((ref.underlyingActor, ref))
None
case Handover
val fresh = new Restarter(testActor)
fresh.xx = xx
Some(fresh)
case Fail
throw new IllegalActorStateException("expected")
}
}
}
class Supervisor extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
def receive = {
case _
}
}
}
class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
import ActorRestartSpec._
override def beforeEach { generation = 0 }
override def afterEach {
val it = toStop.iterator
while (it.hasNext) {
it.next.stop()
it.remove
}
}
private var toStop = new java.util.concurrent.ConcurrentSkipListSet[ActorRef]
private def newActor(f: Actor): ActorRef = {
val ref = actorOf(f)
toStop add ref
ref.start()
}
"An Actor restart" must {
"invoke preRestart, preStart, postRestart" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
"support creation of nested actors in freshInstance()" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
"use freshInstance() if available" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 42)
}
"fall back to default factory if freshInstance() fails" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 0)
}
}
}

View file

@ -32,7 +32,7 @@ object FSMTransitionSpec {
case Ev("reply") stay replying "reply"
}
initialize
override def preRestart(reason: Throwable) { target ! "restarted" }
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
}
class Forwarder(target: ActorRef) extends Actor {

View file

@ -25,7 +25,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
case Die throw new Exception(self.address + " is dying...")
}
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
log += self.address
}
}

View file

@ -62,7 +62,7 @@ object Ticket669Spec {
case msg throw new Exception("test")
}
override def preRestart(reason: scala.Throwable) {
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("failure1")
}

View file

@ -10,6 +10,7 @@ import akka.util.duration._
import akka.actor._
import akka.actor.Actor._
import akka.routing._
import akka.event.EventHandler
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.{ KeptPromise, Future }

View file

@ -5,6 +5,7 @@
package akka.actor
import DeploymentConfig._
import akka.experimental
import akka.dispatch._
import akka.config._
import Config._
@ -662,9 +663,23 @@ trait Actor {
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
def preRestart(reason: Throwable, message: Option[Any]) {}
/**
* User overridable callback.
* <p/>
* Is called on the crashed Actor to give it the option of producing the
* Actor's reincarnation. If it returns None, which is the default, the
* initially provided actor factory is used.
* <p/>
* <b>Warning:</b> <i>Propagating state from a crashed actor carries the risk
* of proliferating the cause of the error. Consider let-it-crash first.</i>
*/
@experimental("1.2")
def freshInstance(): Option[Actor] = None
/**
* User overridable callback.

View file

@ -776,7 +776,8 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
def performRestart() {
val failedActor = actorInstance.get
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
failedActor.preRestart(reason)
val message = if (currentMessage ne null) Some(currentMessage.message) else None
failedActor.preRestart(reason, message)
val freshActor = newActor
setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
@ -857,7 +858,20 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
actorFactory()
if (_status == ActorRefInternals.BEING_RESTARTED) {
val a = actor
val fresh = try a.freshInstance catch {
case e
EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory")
None
}
fresh match {
case Some(ref) ref
case None actorFactory()
}
} else {
actorFactory()
}
} finally {
val stackAfter = refStack.get
if (stackAfter.nonEmpty)
@ -1011,7 +1025,7 @@ private[akka] case class RemoteActorRef private[akka](
case _ None
}
val chFuture = channel match {
case f: Promise[Any] Some(f)
case f: Promise[_] Some(f.asInstanceOf[Promise[Any]])
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader)

View file

@ -102,7 +102,7 @@ abstract class UntypedActor extends Actor {
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
override def preRestart(reason: Throwable) {}
override def preRestart(reason: Throwable, lastMessage: Option[Any]) {}
/**
* User overridable callback.

View file

@ -197,14 +197,15 @@ object Dispatchers {
case "GlobalDispatcher" GlobalDispatcherConfigurator
case fqn
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
case r: Right[_, Class[MessageDispatcherConfigurator]]
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match {
case r: Right[Exception, MessageDispatcherConfigurator] r.b
case l: Left[Exception, MessageDispatcherConfigurator]
throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a)
case Right(clazz)
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
case Right(configurator) configurator
case Left(exception)
throw new IllegalArgumentException(
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
}
case l: Left[Exception, _]
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a)
case Left(exception)
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, exception)
}
} map {
_ configure cfg

View file

@ -90,8 +90,8 @@ object Futures {
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
f.value.get match {
case r: Right[Throwable, T]
val added = results add r.b
case Right(value)
val added = results add value
if (added && results.size == allDone) { //Only one thread can get here
if (done.switchOn) {
try {
@ -109,9 +109,9 @@ object Futures {
}
}
}
case l: Left[Throwable, T]
case Left(exception)
if (done.switchOn) {
result completeWithException l.a
result completeWithException exception
results.clear
}
}
@ -148,10 +148,8 @@ object Futures {
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case r: Right[Throwable, T]
result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
case l: Left[Throwable, T]
result.completeWithException(l.a)
case Right(value) result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
case Left(exception) result.completeWithException(exception)
}
}
}

View file

@ -112,8 +112,8 @@ object EventHandler extends ListenerManagement {
defaultListeners foreach { listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case r: Right[_, Class[Actor]] addListener(Actor.localActorOf(r.b).start())
case l: Left[Exception, _] throw l.a
case Right(actorClass) addListener(Actor.localActorOf(actorClass).start())
case Left(exception) throw exception
}
} catch {
case e: Exception

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka
import annotation.target._
/**
* This annotation marks a feature which is not yet considered stable and may
* change or be removed in a future release.
*
* @author Roland Kuhn
* @since 1.2
*/
@getter
@setter
@beanGetter
@beanSetter
final class experimental(since: String) extends annotation.StaticAnnotation

View file

@ -265,7 +265,7 @@ trait MailboxPressureCapacitor {
*/
trait ActiveFuturesPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.channel.isInstanceOf[Promise[Any]] }
delegates count { _.channel.isInstanceOf[Promise[_]] }
}
/**

View file

@ -8,17 +8,28 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B
import akka.util.ClassLoaderObjectInputStream
object Serializer {
val defaultSerializerName = JavaSerializer.getClass.getName
val defaultSerializerName = classOf[JavaSerializer].getName
type Identifier = Byte
}
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object
*/
trait Serializer extends scala.Serializable {
/**
* Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic
* Values from 0 to 16 is reserved for Akka internal usage
*/
def identifier: Serializer.Identifier
/**
* Serializes the given object into an Array of Byte
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef
}

View file

@ -58,7 +58,7 @@ trait ProducerSupport { this: Actor ⇒
* Default implementation of <code>Actor.preRestart</code> for freeing resources needed
* to actually send messages to <code>endpointUri</code>.
*/
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
try { preRestartProducer(reason) } finally { processor.stop }
}

View file

@ -251,7 +251,7 @@ object ConsumerScalaTest {
case "succeed" self.reply("ok")
}
override def preRestart(reason: scala.Throwable) {
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
self.tryReply("pr")
}

View file

@ -703,8 +703,6 @@ class DefaultClusterNode private[akka](
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
val serializerClassName = serializer.getClass.getName
EventHandler.debug(this,
"Storing actor with address [%s] in cluster".format(actorAddress))
@ -739,9 +737,9 @@ class DefaultClusterNode private[akka](
// create ADDRESS -> SERIALIZER CLASS NAME mapping
try {
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString)
} catch {
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString)
}
// create ADDRESS -> NODE mapping
@ -1084,21 +1082,10 @@ class DefaultClusterNode private[akka](
/**
* Returns Serializer for actor with specific address.
*/
def serializerForActor(actorAddress: String): Serializer = {
val serializerClassName =
try {
zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String]
} catch {
case e: ZkNoNodeException throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
}
ReflectiveAccess.getClassFor(serializerClassName) match {
// FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
case Right(clazz) clazz.newInstance.asInstanceOf[Serializer]
case Left(error)
EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString))
throw error
}
def serializerForActor(actorAddress: String): Serializer = try {
Serialization.serializerByIdentity(zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String].toByte)
} catch {
case e: ZkNoNodeException throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
}
/**
@ -1790,7 +1777,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def preRestart(reason: Throwable) {
override def preRestart(reason: Throwable, msg: Option[Any]) {
EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason))
}
@ -1930,7 +1917,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Unit] try {
case f: Function0[_] try {
f()
} finally {
self.stop()
@ -1943,7 +1930,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Any] try {
case f: Function0[_] try {
self.reply(f())
} finally {
self.stop()
@ -1956,8 +1943,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {
fun(param)
case (fun: Function[_, _], param: Any) try {
fun.asInstanceOf[Any => Unit].apply(param)
} finally {
self.stop()
}
@ -1969,8 +1956,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {
self.reply(fun(param))
case (fun: Function[_, _], param: Any) try {
self.reply(fun.asInstanceOf[Any => Any](param))
} finally {
self.stop()
}

View file

@ -88,14 +88,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
lock.readLock.lock
try {
val c = remoteClients.get(key) match {
case s: Some[RemoteClient] s.get
case Some(client) client
case None
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case s: Some[RemoteClient] s.get //If already populated by other writer
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
@ -111,15 +111,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case s: Some[RemoteClient] s.get.shutdown()
case None false
case Some(client) client.shutdown()
case None false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case s: Some[RemoteClient] s.get.connect(reconnectIfAlreadyConnected = true)
case None false
case Some(client) client.connect(reconnectIfAlreadyConnected = true)
case None false
}
}
@ -632,12 +632,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case s: Some[NettyRemoteServer] s.get.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
case Some(server) server.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
case s: Some[NettyRemoteServer] s.get.name
case Some(server) server.name
case None
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
@ -920,15 +920,15 @@ class RemoteServerHandler(
request.getActorInfo.getTimeout,
new ActorPromise(request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request))
case r: Right[Throwable, Any]
case Left(exception) write(channel, createErrorReplyMessage(exception, request))
case r: Right[_,_]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
actorInfo.getTimeout,
r,
true,
r.asInstanceOf[Either[Throwable,Any]],
isOneWay = true,
Some(actorRef))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method

View file

@ -95,10 +95,10 @@ object ActorSerialization {
if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
val messages =
actorRef.mailbox match {
case q: java.util.Queue[MessageInvocation]
case q: java.util.Queue[_]
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val it = q.iterator
while (it.hasNext == true) l += it.next
while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation]
l
}

View file

@ -57,16 +57,14 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode {
}
barrier("store-1-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s)
node.store("hello-world-1", classOf[HelloWorld1], serializer)
node.store("hello-world-1", classOf[HelloWorld1], Serialization.serializerFor(classOf[HelloWorld1]))
}
barrier("use-1-in-node-2", NrOfNodes) {
}
barrier("store-2-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s)
node.store("hello-world-2", classOf[HelloWorld1], false, serializer)
node.store("hello-world-2", classOf[HelloWorld1], false, Serialization.serializerFor(classOf[HelloWorld1]))
}
barrier("use-2-in-node-2", NrOfNodes) {

View file

@ -0,0 +1,6 @@
.. _migration-1.2:
################################
Migration Guide 1.1.x to 1.2.x
################################

View file

@ -0,0 +1,20 @@
.. _migration-2.0:
################################
Migration Guide 1.2.x to 2.0.x
################################
Actors
======
The 2.0 release contains several new features which require source-level
changes in client code. This API cleanup is planned to be the last one for a
significant amount of time.
Lifecycle Callbacks
-------------------
The :meth:`preRestart(cause: Throwable)` method has been replaced by
:meth:`preRestart(cause: Throwable, lastMessage: Any)`, hence you must insert
the second argument in all overriding methods. The good news is that any missed
actor will not compile without error.

View file

@ -6,6 +6,8 @@ Migration Guides
.. toctree::
:maxdepth: 1
migration-guide-1.2.x-2.0.x
migration-guide-1.1.x-1.2.x
migration-guide-1.0.x-1.1.x
migration-guide-0.10.x-1.0.x
migration-guide-0.9.x-0.10.x

View file

@ -92,6 +92,90 @@ Here we create a light-weight actor-based thread, that can be used to spawn off
... // do stuff
}
Actor Internal API
------------------
The :class:`Actor` trait defines only one abstract method, the abovementioned
:meth:`receive`. In addition, it offers two convenience methods
:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as
described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actors
:class:`ActorRef` object. If the current actor behavior does not match a
received message, :meth:`unhandled` is called, which by default throws an
:class:`UnhandledMessageException`.
The remaining visible methods are user-overridable life-cycle hooks which are
described in the following::
def preStart() {}
def preRestart(cause: Throwable, message: Option[Any]) {}
def freshInstance(): Option[Actor] = None
def postRestart(cause: Throwable) {}
def postStop() {}
The implementations shown above are the defaults provided by the :class:`Actor`
trait.
Start Hook
^^^^^^^^^^
Right after starting the actor, its :meth:`preStart` method is invoked. This is
guaranteed to happen before the first message from external sources is queued
to the actors mailbox.
::
override def preStart {
// e.g. send initial message to self
self ! GetMeStarted
// or do any other stuff, e.g. registering with other actors
someService ! Register(self)
}
Restart Hooks
^^^^^^^^^^^^^
A supervised actor, i.e. one which is linked to another actor with a fault
handling strategy, will be restarted in case an exception is thrown while
processing a message. This restart involves four of the hooks mentioned above:
1. The old actor is informed by calling :meth:`preRestart` with the exception
which caused the restart and the message which triggered that exception; the
latter may be ``None`` if the restart was not caused by processing a
message, e.g. when a supervisor does not trap the exception and is restarted
in turn by its supervisor. This method is the best place for cleaning up,
preparing hand-over to the fresh actor instance, etc.
2. The old actors :meth:`freshInstance` factory method is invoked, which may
optionally produce the new actor instance which will replace this actor. If
this method returns :obj:`None` or throws an exception, the initial factory
from the ``Actor.actorOf`` call is used to produce the fresh instance.
3. The new actors :meth:`preStart` method is invoked, just as in the normal
start-up case.
4. The new actors :meth:`postRestart` method is called with the exception
which caused the restart.
.. warning::
The :meth:`freshInstance` hook may be used to propagate (part of) the failed
actors state to the fresh instance. This carries the risk of proliferating
the cause for the crash which triggered the restart. If you are tempted to
take this route, it is strongly advised to step back and consider other
possible approaches, e.g. distributing the state in question using other
means or spawning short-lived worker actors for carrying out “risky” tasks.
An actor restart replaces only the actual actor object; the contents of the
mailbox and the hotswap stack are unaffected by the restart, so processing of
messages will resume after the :meth:`postRestart` hook returns. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
Stop Hook
^^^^^^^^^
After stopping an actor, its :meth:`postStop` hook is called, which may be used
e.g. for deregistering this actor from other services. This hook is guaranteed
to run after message queuing has been disabled for this actor, i.e. sending
messages would fail with an :class:`IllegalActorStateException`.
Identifying Actors
------------------
@ -252,43 +336,6 @@ This method should return a ``PartialFunction``, e.g. a match/case clause
}
}
Actor internal API
------------------
The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the:
#. ``receive`` message handler
#. life-cycle callbacks:
#. preStart
#. postStop
#. preRestart
#. postRestart
The ``Actor`` trait has one single member field:
.. code-block:: scala
val self: ActorRef
This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc.
However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix:
.. code-block:: scala
class MyActor extends Actor {
import self._
id = ...
dispatcher = ...
start
...
}
But in this documentation we will always prefix the calls with ``self`` for clarity.
Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API.
Reply to messages
-----------------
@ -441,6 +488,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
.. _Actor.HotSwap:
HotSwap
-------

View file

@ -93,7 +93,7 @@ You can also set the rejection policy that should be used, e.g. what should be d
* java.util.concurrent.ThreadPoolExecutor.DiscardPolicy - discards the message (throws it away)
* java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - discards the oldest message in the mailbox (throws it away)
You cane read more about these policies `here <http://java.sun.com/javase/6/docs/api/index.html?java/util/concurrent/RejectedExecutionHandler.html>`_.
You can read more about these policies `here <http://java.sun.com/javase/6/docs/api/index.html?java/util/concurrent/RejectedExecutionHandler.html>`_.
Here is an example:
@ -104,7 +104,7 @@ Here is an example:
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
class MyActor extends Actor {
self.dispatcher = Dispatchers.newDispatcher(name)
self.dispatcher = Dispatchers.newDispatcher(name, throughput = 15)
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
@ -114,8 +114,14 @@ Here is an example:
...
}
This 'Dispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep.
Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file:
The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will bunch up to ``throughput`` message invocations together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it
explicitly then it uses the default value defined in the 'akka.conf'
configuration file:
.. code-block:: ruby

View file

@ -288,6 +288,13 @@ assertions concerning received messages. Here is the full list:
does a conformance check; if you need the class to be equal, have a look at
:meth:`expectMsgAllClassOf` with a single given class argument.
* :meth:`expectMsgType[T: Manifest](d: Duration)`
An object which is an instance of the given type (after erasure) must be
received within the allotted time frame; the object will be returned. This
method is approximately equivalent to
``expectMsgClass(manifest[T].erasure)``.
* :meth:`expectMsgAnyOf[T](d: Duration, obj: T*): T`
An object must be received within the given time, and it must be equal (
@ -411,21 +418,25 @@ is between :obj:`min` and :obj:`max`, where the former defaults to zero. The
deadline calculated by adding the :obj:`max` parameter to the block's start
time is implicitly available within the block to all examination methods, if
you do not specify it, is is inherited from the innermost enclosing
:meth:`within` block. It should be noted that using :meth:`expectNoMsg` will
terminate upon reception of a message or at the deadline, whichever occurs
first; it follows that this examination usually is the last statement in a
:meth:`within` block.
It should be noted that if the last message-receiving assertion of the block is
:meth:`expectNoMsg` or :meth:`receiveWhile`, the final check of the
:meth:`within` is skipped in order to avoid false positives due to wake-up
latencies. This means that while individual contained assertions still use the
maximum time bound, the overall block may take arbitrarily longer in this case.
.. code-block:: scala
class SomeSpec extends WordSpec with MustMatchers with TestKit {
"A Worker" must {
"send timely replies" in {
val worker = actorOf(...)
within (50 millis) {
within (500 millis) {
worker ! "some work"
expectMsg("some result")
expectNoMsg
expectNoMsg // will block for the rest of the 500ms
Thread.sleep(1000) // will NOT make this block fail
}
}
}

View file

@ -9,6 +9,3 @@ version := "2.0-SNAPSHOT"
publishMavenStyle := true
publishTo := Some("Typesafe Publish Repo" at "http://repo.typesafe.com/typesafe/maven-releases/")
credentials += Credentials(Path.userHome / ".ivy2" / "typesafe-credentials")

View file

@ -4,25 +4,37 @@
import sbt._
import sbt.Keys._
import sbt.Load.BuildStructure
import sbt.classpath.ClasspathUtilities
import sbt.Project.Initialize
import sbt.CommandSupport._
import java.io.File
import scala.collection.mutable.{ Set => MutableSet }
object AkkaMicrokernelPlugin extends Plugin {
case class DistConfig(
outputDirectory: File,
configSourceDirs: Seq[File],
distJvmOptions: String,
distMainClass: String,
libFilter: File Boolean,
additionalLibs: Seq[File])
val Dist = config("dist") extend (Runtime)
val dist = TaskKey[File]("dist", "Builds an Akka microkernel directory")
// TODO how to reuse keyword "clean" here instead (dist:clean)
val distClean = TaskKey[File]("clean-dist", "Removes Akka microkernel directory")
val distClean = TaskKey[Unit]("clean", "Removes Akka microkernel directory")
val outputDirectory = SettingKey[File]("output-directory")
val configSourceDirs = TaskKey[Seq[File]]("config-source-directories",
"Configuration files are copied from these directories")
val distJvmOptions = SettingKey[String]("jvm-options", "JVM parameters to use in start script")
val distJvmOptions = SettingKey[String]("kernel-jvm-options", "JVM parameters to use in start script")
val distMainClass = SettingKey[String]("kernel-main-class", "Kernel main class to use in start script")
val libFilter = SettingKey[File Boolean]("lib-filter", "Filter of dependency jar files")
val additionalLibs = TaskKey[Seq[File]]("additional-libs", "Additional dependency jar files")
val distConfig = TaskKey[DistConfig]("dist-config")
override lazy val settings =
inConfig(Dist)(Seq(
@ -36,46 +48,59 @@ object AkkaMicrokernelPlugin extends Plugin {
distJvmOptions := "-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC",
distMainClass := "akka.kernel.Main",
libFilter := { f true },
additionalLibs <<= defaultAdditionalLibs)) ++
additionalLibs <<= defaultAdditionalLibs,
distConfig <<= (outputDirectory, configSourceDirs, distJvmOptions, distMainClass, libFilter, additionalLibs) map DistConfig)) ++
Seq(
dist <<= (dist in Dist).identity)
private def distTask: Initialize[Task[File]] =
(outputDirectory, sourceDirectory, crossTarget, dependencyClasspath,
configSourceDirs, distJvmOptions, distMainClass, libFilter, streams) map {
(outDir, src, tgt, cp, configSrc, jvmOptions, mainClass, libFilt, s)
(distConfig, sourceDirectory, crossTarget, dependencyClasspath, projectDependencies, allDependencies, buildStructure, state, streams) map {
(conf, src, tgt, cp, projDeps, allDeps, buildStruct, st, s)
if (isKernelProject(allDeps)) {
val log = s.log
val distBinPath = outDir / "bin"
val distConfigPath = outDir / "config"
val distDeployPath = outDir / "deploy"
val distLibPath = outDir / "lib"
// TODO how do I grab the additionalLibs setting? Can't add it in input tuple, limitation of number of elements in map of tuple.
val addLibs = Seq.empty[File]
log.info("Creating distribution %s ..." format outDir)
IO.createDirectory(outDir)
Scripts(jvmOptions, mainClass).writeScripts(distBinPath)
copyDirectories(configSrc, distConfigPath)
val distBinPath = conf.outputDirectory / "bin"
val distConfigPath = conf.outputDirectory / "config"
val distDeployPath = conf.outputDirectory / "deploy"
val distLibPath = conf.outputDirectory / "lib"
val subProjectDependencies: Set[SubProjectInfo] = allSubProjectDependencies(projDeps, buildStruct, st)
log.info("Creating distribution %s ..." format conf.outputDirectory)
IO.createDirectory(conf.outputDirectory)
Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath)
copyDirectories(conf.configSourceDirs, distConfigPath)
copyJars(tgt, distDeployPath)
copyFiles(libFiles(cp, libFilt), distLibPath)
copyFiles(addLibs, distLibPath)
copyFiles(libFiles(cp, conf.libFilter), distLibPath)
copyFiles(conf.additionalLibs, distLibPath)
for (subTarget <- subProjectDependencies.map(_.target)) {
copyJars(subTarget, distLibPath)
}
log.info("Distribution created.")
outDir
}
private def distCleanTask: Initialize[Task[File]] =
(outputDirectory, streams) map { (outDir, s)
val log = s.log
log.info("Cleaning " + outDir)
IO.delete(outDir)
outDir
conf.outputDirectory
}
def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources)
Seq(src / "main" / "config") ++ resources
private def distCleanTask: Initialize[Task[Unit]] =
(outputDirectory, allDependencies, streams) map { (outDir, deps, s)
if (isKernelProject(deps)) {
val log = s.log
log.info("Cleaning " + outDir)
IO.delete(outDir)
}
}
def isKernelProject(dependencies: Seq[ModuleID]): Boolean = {
dependencies.exists(moduleId => moduleId.organization == "se.scalablesolutions.akka" && moduleId.name == "akka-kernel")
}
private def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources)
Seq(src / "config", src / "main" / "config") ++ resources
}
def defaultAdditionalLibs = (libraryDependencies) map { (libs)
private def defaultAdditionalLibs = (libraryDependencies) map { (libs)
Seq.empty[File]
}
@ -146,6 +171,67 @@ object AkkaMicrokernelPlugin extends Plugin {
val (libs, directories) = classpath.map(_.data).partition(ClasspathUtilities.isArchive)
libs.map(_.asFile).filter(libFilter)
}
private def allSubProjectDependencies(projDeps: Seq[ModuleID], buildStruct: BuildStructure, state: State): Set[SubProjectInfo] = {
val buildUnit = buildStruct.units(buildStruct.root)
val uri = buildStruct.root
val allProjects = buildUnit.defined.map {
case (id, proj) => (ProjectRef(uri, id) -> proj)
}
val projDepsNames = projDeps.map(_.name)
def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id)
val subProjects: Seq[SubProjectInfo] = allProjects.collect {
case (projRef, project) if include(project) => projectInfo(projRef, project, buildStruct, state, allProjects)
}.toList
val allSubProjects = subProjects.map(_.recursiveSubProjects).flatten.toSet
allSubProjects
}
private def projectInfo(projectRef: ProjectRef, project: ResolvedProject, buildStruct: BuildStructure, state: State,
allProjects: Map[ProjectRef, ResolvedProject]): SubProjectInfo = {
def optionalSetting[A](key: ScopedSetting[A]) = key in projectRef get buildStruct.data
def setting[A](key: ScopedSetting[A], errorMessage: => String) = {
optionalSetting(key) getOrElse {
logger(state).error(errorMessage);
throw new IllegalArgumentException()
}
}
def evaluateTask[T](taskKey: sbt.Project.ScopedKey[sbt.Task[T]]) = {
EvaluateTask.evaluateTask(buildStruct, taskKey, state, projectRef, false, EvaluateTask.SystemProcessors)
}
val projDeps: Seq[ModuleID] = evaluateTask(Keys.projectDependencies) match {
case Some(Value(moduleIds)) => moduleIds
case _ => Seq.empty
}
val projDepsNames = projDeps.map(_.name)
def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id)
val subProjects = allProjects.collect {
case (projRef, proj) if include(proj) => projectInfo(projRef, proj, buildStruct, state, allProjects)
}.toList
val target = setting(Keys.crossTarget, "Missing crossTarget directory")
SubProjectInfo(project.id, target, subProjects)
}
private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) {
def recursiveSubProjects: Set[SubProjectInfo] = {
val flatSubProjects = for {
x <- subProjects
y <- x.recursiveSubProjects
} yield y
flatSubProjects.toSet + this
}
}
}

View file

@ -2,6 +2,8 @@ package akka.spring;
import akka.actor.*;
import scala.Option;
import java.util.concurrent.CountDownLatch;
public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne {
@ -22,7 +24,7 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc
}
@Override
public void preRestart(Throwable e) {
public void preRestart(Throwable e, Option<Object> msg) {
try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {}
latch.countDown();
}

View file

@ -2,6 +2,8 @@ package akka.spring;
import akka.actor.*;
import scala.Option;
import java.util.concurrent.CountDownLatch;
public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo {
@ -22,7 +24,7 @@ public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedAc
}
@Override
public void preRestart(Throwable e) {
public void preRestart(Throwable e, Option<Object> msg) {
try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {}
latch.countDown();
}

View file

@ -282,8 +282,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
def receive = {
case update: Update[T]
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
case update: Update[_]
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] })
case Get self reply agent.get
case _ ()
}
@ -298,8 +298,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
def receive = {
case update: Update[T] try {
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
case update: Update[_] try {
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] })
} finally {
agent.resume
self.stop()

View file

@ -70,12 +70,12 @@ object TestActorRef {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[T](manifest[T].erasure, noParams, noArgs) match {
case r: Right[_, T] r.b
case l: Left[Exception, _] throw new ActorInitializationException(
case Right(value) value
case Left(exception) throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", l.a)
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", exception)
}
}, address)
}

View file

@ -110,13 +110,12 @@ trait TestKitLight {
val senderOption = Some(testActor)
private var end: Duration = Duration.Inf
/*
* THIS IS A HACK: expectNoMsg and receiveWhile are bounded by `end`, but
* running them should not trigger an AssertionError, so mark their end
* time here and do not fail at the end of `within` if that time is not
* long gone.
/**
* if last assertion was expectNoMsg, disable timing failure upon within()
* block end.
*/
private var lastSoftTimeout: Duration = now - 5.millis
private var lastWasNoMsg = false
/**
* Stop test actor. Should be done at the end of the test unless relying on
@ -211,6 +210,8 @@ trait TestKitLight {
val rem = end - start
assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left")
lastWasNoMsg = false
val max_diff = _max min rem
val prev_end = end
end = start + max_diff
@ -219,13 +220,8 @@ trait TestKitLight {
val diff = now - start
assert(min <= diff, "block took " + format(min.unit, diff) + ", should at least have been " + min)
/*
* caution: HACK AHEAD
*/
if (now - lastSoftTimeout > 5.millis) {
if (!lastWasNoMsg) {
assert(diff <= max_diff, "block took " + format(_max.unit, diff) + ", exceeding " + format(_max.unit, max_diff))
} else {
lastSoftTimeout -= 5.millis
}
ret
@ -302,6 +298,20 @@ trait TestKitLight {
f(o)
}
/**
* Same as `expectMsgType[T](remaining)`, but correctly treating the timeFactor.
*/
def expectMsgType[T](implicit m: Manifest[T]): T = expectMsgClass_internal(remaining, m.erasure.asInstanceOf[Class[T]])
/**
* Receive one message from the test actor and assert that it conforms to the
* given type (after erasure). Wait time is bounded by the given duration,
* with an AssertionFailure being thrown in case of timeout.
*
* @return the received object
*/
def expectMsgType[T](max: Duration)(implicit m: Manifest[T]): T = expectMsgClass_internal(max.dilated, m.erasure.asInstanceOf[Class[T]])
/**
* Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor.
*/
@ -452,7 +462,7 @@ trait TestKitLight {
private def expectNoMsg_internal(max: Duration) {
val o = receiveOne(max)
assert(o eq null, "received unexpected message " + o)
lastSoftTimeout = now
lastWasNoMsg = true
}
/**
@ -501,7 +511,7 @@ trait TestKitLight {
}
val ret = doit(Nil)
lastSoftTimeout = now
lastWasNoMsg = true
ret
}
@ -541,6 +551,7 @@ trait TestKitLight {
} else {
queue.takeFirst
}
lastWasNoMsg = false
message match {
case null
lastMessage = NullMessage

View file

@ -174,7 +174,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000))
val ref = TestActorRef(new TActor {
def receiveT = { case _ }
override def preRestart(reason: Throwable) { counter -= 1 }
override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
override def postRestart(reason: Throwable) { counter -= 1 }
}).start()
self.dispatcher = CallingThreadDispatcher.global