Cleaned up the Actor and Supervisor classes. Added implicit sender to actor ! methods, works with 'sender' field and 'reply'
This commit is contained in:
parent
9606dbfe3f
commit
805fac6ebb
22 changed files with 288 additions and 158 deletions
|
|
@ -160,6 +160,8 @@ private[akka] sealed case class AspectInit(
|
|||
*/
|
||||
@Aspect("perInstance")
|
||||
private[akka] sealed class ActiveObjectAspect {
|
||||
import Actor._
|
||||
|
||||
@volatile var isInitialized = false
|
||||
var target: Class[_] = _
|
||||
var actor: Dispatcher = _
|
||||
|
|
|
|||
|
|
@ -23,15 +23,22 @@ import org.codehaus.aspectwerkz.proxy.Uuid
|
|||
|
||||
import org.multiverse.utils.ThreadLocalTransaction._
|
||||
|
||||
@serializable sealed abstract class LifeCycleMessage
|
||||
/**
|
||||
* Mix in this trait to give an actor TransactionRequired semantics.
|
||||
* Equivalent to invoking the 'makeTransactionRequired' method in the actor.
|
||||
*/
|
||||
trait TransactionRequired { this: Actor =>
|
||||
makeTransactionRequired
|
||||
}
|
||||
|
||||
@serializable sealed trait LifeCycleMessage
|
||||
case class Init(config: AnyRef) extends LifeCycleMessage
|
||||
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage
|
||||
case class Restart(reason: AnyRef) extends LifeCycleMessage
|
||||
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
|
||||
case class Kill(killer: Actor) extends LifeCycleMessage
|
||||
//case object TransactionalInit extends LifeCycleMessage
|
||||
case object Kill extends LifeCycleMessage
|
||||
|
||||
class ActorKilledException(val killed: Actor, val killer: Actor) extends RuntimeException("Actor [" + killed + "] killed by [" + killer + "]")
|
||||
class ActorKilledException(val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message")
|
||||
|
||||
sealed abstract class DispatcherType
|
||||
object DispatcherType {
|
||||
|
|
@ -55,13 +62,21 @@ object Actor {
|
|||
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
|
||||
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
|
||||
|
||||
implicit val any: AnyRef = this
|
||||
|
||||
def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() {
|
||||
start
|
||||
def receive = body
|
||||
}
|
||||
|
||||
def actor(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() {
|
||||
start
|
||||
body
|
||||
def receive = matcher
|
||||
}
|
||||
|
||||
def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() {
|
||||
lifeCycle = Some(lifeCycleConfig)
|
||||
lifeCycle = lifeCycleConfig
|
||||
start
|
||||
def receive = body
|
||||
}
|
||||
|
|
@ -83,10 +98,15 @@ object Actor {
|
|||
trait Actor extends Logging with TransactionManagement {
|
||||
ActorRegistry.register(this)
|
||||
|
||||
implicit val self: AnyRef = this
|
||||
|
||||
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
|
||||
val uuid = Uuid.newUuid.toString
|
||||
|
||||
// ====================================
|
||||
// private fields
|
||||
// ====================================
|
||||
|
||||
@volatile private var _isRunning: Boolean = false
|
||||
@volatile private var _isShutDown: Boolean = false
|
||||
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
|
||||
|
|
@ -97,12 +117,28 @@ trait Actor extends Logging with TransactionManagement {
|
|||
private[akka] var _mailbox: MessageQueue = _
|
||||
private[akka] var _supervisor: Option[Actor] = None
|
||||
|
||||
// ====================================
|
||||
// protected fields
|
||||
// ====================================
|
||||
|
||||
/**
|
||||
* This field should normally not be touched by user code, which should instead use the 'reply' method.
|
||||
* The 'sender' field holds the sender of the message currently being processed.
|
||||
* <p/>
|
||||
* If the sender was an actor then it is defined as 'Some(senderActor)' and
|
||||
* if the sender was of some other instance then it is defined as 'None'.
|
||||
* <p/>
|
||||
* This sender reference can be used together with the '!' method for request/reply
|
||||
* message exchanges and which is in many ways better than using the '!!' method
|
||||
* which will make the sender wait for a reply using a *blocking* future.
|
||||
*/
|
||||
protected[this] var sender: Option[Actor] = None
|
||||
|
||||
/**
|
||||
* The 'senderFuture' field should normally not be touched by user code, which should instead use the 'reply' method.
|
||||
* But it can be used for advanced use-cases when one might want to store away the future and
|
||||
* resolve it later and/or somewhere else.
|
||||
*/
|
||||
protected var senderFuture: Option[CompletableFutureResult] = None
|
||||
protected[this] var senderFuture: Option[CompletableFutureResult] = None
|
||||
|
||||
// ====================================
|
||||
// ==== USER CALLBACKS TO OVERRIDE ====
|
||||
|
|
@ -127,30 +163,19 @@ trait Actor extends Logging with TransactionManagement {
|
|||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* User can (and is encouraged to) override the default configuration (newEventBasedThreadPoolDispatcher)
|
||||
* so it fits the specific use-case that the actor is used for.
|
||||
* <p/>
|
||||
* It is beneficial to have actors share the same dispatcher, easily +100 actors can share the same.
|
||||
* The default dispatcher is the <tt>Dispatchers.globalEventBasedThreadPoolDispatcher</tt>.
|
||||
* This means that all actors will share the same event-driven thread-pool based dispatcher.
|
||||
* <p/>
|
||||
* But if you are running many many actors then it can be a good idea to have split them up in terms of
|
||||
* dispatcher sharing.
|
||||
* You can override it so it fits the specific use-case that the actor is used for.
|
||||
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
|
||||
* dispatchers available.
|
||||
* <p/>
|
||||
* Default is that all actors that are created and spawned from within this actor is sharing the same
|
||||
* dispatcher as its creator.
|
||||
* <pre>
|
||||
* val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
|
||||
* dispatcher
|
||||
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
* .setCorePoolSize(16)
|
||||
* .setMaxPoolSize(128)
|
||||
* .setKeepAliveTimeInMillis(60000)
|
||||
* .setRejectionPolicy(new CallerRunsPolicy)
|
||||
* .buildThreadPool
|
||||
* </pre>
|
||||
* The default is also that all actors that are created and spawned from within this actor
|
||||
* is sharing the same dispatcher as its creator.
|
||||
*/
|
||||
protected[akka] var messageDispatcher: MessageDispatcher = {
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
|
||||
val dispatcher = Dispatchers.globalEventBasedThreadPoolDispatcher
|
||||
_mailbox = dispatcher.messageQueue
|
||||
dispatcher
|
||||
}
|
||||
|
|
@ -190,7 +215,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*
|
||||
* Defines the life-cycle for a supervised actor. Default is 'LifeCycle(Permanent)' but can be overridden.
|
||||
*/
|
||||
@volatile var lifeCycle: Option[LifeCycle] = Some(LifeCycle(Permanent))
|
||||
@volatile var lifeCycle: LifeCycle = LifeCycle(Permanent)
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -270,7 +295,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
/**
|
||||
* Starts up the actor and its message queue.
|
||||
*/
|
||||
def start = synchronized {
|
||||
def start: Actor = synchronized {
|
||||
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that have been shut down with 'exit'")
|
||||
if (!_isRunning) {
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
|
|
@ -279,6 +304,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
//if (isTransactional) this !! TransactionalInit
|
||||
}
|
||||
log.info("[%s] has started", toString)
|
||||
this
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -303,10 +329,28 @@ trait Actor extends Logging with TransactionManagement {
|
|||
|
||||
/**
|
||||
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
|
||||
* <p/>
|
||||
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
|
||||
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable.
|
||||
* <p/>
|
||||
* If invoked from within another object then add this import to resolve the implicit argument:
|
||||
* <pre>
|
||||
* import se.scalablesolutions.akka.actor.Actor._
|
||||
* </pre>
|
||||
*/
|
||||
def !(message: AnyRef) =
|
||||
if (_isRunning) postMessageToMailbox(message)
|
||||
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
def !(message: AnyRef)(implicit sender: AnyRef) = {
|
||||
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
|
||||
else None
|
||||
if (_isRunning) postMessageToMailbox(message, from)
|
||||
else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
def send(message: AnyRef) = {
|
||||
if (_isRunning) postMessageToMailbox(message, None)
|
||||
else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message.
|
||||
|
|
@ -315,6 +359,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
|
|
@ -330,7 +376,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
else None
|
||||
}
|
||||
getResultOrThrowException(future)
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
} else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message.
|
||||
|
|
@ -339,6 +386,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
|
||||
* implement request/response message exchanges.
|
||||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
|
|
@ -354,22 +403,31 @@ trait Actor extends Logging with TransactionManagement {
|
|||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
|
||||
future.awaitBlocking
|
||||
getResultOrThrowException(future).get
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
} else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
||||
/**
|
||||
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Does only work together with the actor <code>!!</code> method and/or active objects not annotated
|
||||
* with <code>@oneway</code>.
|
||||
*/
|
||||
protected[this] def reply(message: AnyRef) = senderFuture match {
|
||||
case None => throw new IllegalStateException(
|
||||
protected[this] def reply(message: AnyRef) = {
|
||||
sender match {
|
||||
case Some(senderActor) =>
|
||||
senderActor ! message
|
||||
case None =>
|
||||
senderFuture match {
|
||||
case None =>
|
||||
throw new IllegalStateException(
|
||||
"\n\tNo sender in scope, can't reply. " +
|
||||
"\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
|
||||
"\n\tYou have probably used the '!' method to either; " +
|
||||
"\n\t\t1. Send a message to a remote actor" +
|
||||
"\n\t\t2. Send a message from an instance that is *not* an actor" +
|
||||
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
|
||||
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." )
|
||||
case Some(future) => future.completeWithResult(message)
|
||||
case Some(future) =>
|
||||
future.completeWithResult(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -385,7 +443,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
messageDispatcher = dispatcher
|
||||
_mailbox = messageDispatcher.messageQueue
|
||||
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
} else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started")
|
||||
} else throw new IllegalArgumentException(
|
||||
"Can not swap dispatcher for " + toString + " after it has been started")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -411,7 +470,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* </pre>
|
||||
*/
|
||||
def makeTransactionRequired = synchronized {
|
||||
if (_isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
|
||||
if (_isRunning) throw new IllegalArgumentException(
|
||||
"Can not make actor transaction required after it has been started")
|
||||
else isTransactionRequiresNew = true
|
||||
}
|
||||
|
||||
|
|
@ -428,10 +488,12 @@ trait Actor extends Logging with TransactionManagement {
|
|||
protected[this] def link(actor: Actor) = {
|
||||
if (_isRunning) {
|
||||
_linkedActors.add(actor)
|
||||
if (actor._supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
||||
if (actor._supervisor.isDefined) throw new IllegalStateException(
|
||||
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
||||
actor._supervisor = Some(this)
|
||||
log.debug("Linking actor [%s] to actor [%s]", actor, this)
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
} else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -445,7 +507,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
_linkedActors.remove(actor)
|
||||
actor._supervisor = None
|
||||
log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
} else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -527,7 +590,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
// ==== IMPLEMENTATION DETAILS ====
|
||||
// ================================
|
||||
|
||||
private def postMessageToMailbox(message: AnyRef): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
|
|
@ -541,7 +604,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
|
||||
} else {
|
||||
val handle = new MessageInvocation(this, message, None, currentTransaction.get)
|
||||
val handle = new MessageInvocation(this, message, None, sender, currentTransaction.get)
|
||||
handle.send
|
||||
}
|
||||
}
|
||||
|
|
@ -560,10 +623,11 @@ trait Actor extends Logging with TransactionManagement {
|
|||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
|
||||
if (future.isDefined) future.get
|
||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||
else throw new IllegalStateException(
|
||||
"Expected a future from remote call to actor " + toString)
|
||||
} else {
|
||||
val future = new DefaultCompletableFutureResult(timeout)
|
||||
val handle = new MessageInvocation(this, message, Some(future), currentTransaction.get)
|
||||
val handle = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
|
||||
handle.send
|
||||
future
|
||||
}
|
||||
|
|
@ -581,16 +645,17 @@ trait Actor extends Logging with TransactionManagement {
|
|||
setTransaction(messageHandle.tx)
|
||||
|
||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||
val future = messageHandle.future
|
||||
senderFuture = messageHandle.future
|
||||
sender = messageHandle.sender
|
||||
|
||||
try {
|
||||
senderFuture = future
|
||||
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
||||
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
|
||||
} catch {
|
||||
case e =>
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||
if (future.isDefined) future.get.completeWithException(this, e)
|
||||
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
||||
else e.printStackTrace
|
||||
} finally {
|
||||
clearTransaction
|
||||
|
|
@ -601,23 +666,25 @@ trait Actor extends Logging with TransactionManagement {
|
|||
setTransaction(messageHandle.tx)
|
||||
|
||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||
val future = messageHandle.future
|
||||
senderFuture = messageHandle.future
|
||||
sender = messageHandle.sender
|
||||
|
||||
def proceed = {
|
||||
try {
|
||||
incrementTransaction
|
||||
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
||||
else throw new IllegalArgumentException("Actor " + toString + " could not process message [" + message + "] since no matching 'case' clause in its 'receive' method could be found")
|
||||
else throw new IllegalArgumentException(
|
||||
"Actor " + toString + " could not process message [" + message + "]" +
|
||||
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
|
||||
} finally {
|
||||
decrementTransaction
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
senderFuture = future
|
||||
if (isTransactionRequiresNew && !isTransactionInScope) {
|
||||
if (senderFuture.isEmpty) throw new StmException(
|
||||
"\n\tCan't continue transaction in a one-way fire-forget message send" +
|
||||
"Can't continue transaction in a one-way fire-forget message send" +
|
||||
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
|
||||
"\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
|
||||
atomic {
|
||||
|
|
@ -627,12 +694,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
} catch {
|
||||
case e =>
|
||||
e.printStackTrace
|
||||
|
||||
if (future.isDefined) future.get.completeWithException(this, e)
|
||||
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
||||
else e.printStackTrace
|
||||
|
||||
clearTransaction // need to clear currentTransaction before call to supervisor
|
||||
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||
} finally {
|
||||
|
|
@ -651,8 +715,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
case Exit(dead, reason) => handleTrapExit(dead, reason)
|
||||
case Kill(killer) => throw new ActorKilledException(this, killer)
|
||||
// case TransactionalInit => initTransactionalState
|
||||
case Kill => throw new ActorKilledException(this)
|
||||
}
|
||||
|
||||
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
|
||||
|
|
@ -663,7 +726,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
|
||||
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
|
||||
}
|
||||
} else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' member field set to non-empty list of exception classes - can't proceed " + toString)
|
||||
} else throw new IllegalStateException(
|
||||
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
|
||||
"\n\tto non-empty list of exception classes - can't proceed " + toString)
|
||||
} else {
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
|
||||
}
|
||||
|
|
@ -672,8 +737,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
private[this] def restartLinkedActors(reason: AnyRef) = {
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
|
||||
actor.lifeCycle match {
|
||||
case None => throw new IllegalStateException("Actor [" + actor.id + "] does not have a life-cycle defined.")
|
||||
case Some(LifeCycle(scope, _)) => {
|
||||
case LifeCycle(scope, _) => {
|
||||
scope match {
|
||||
case Permanent =>
|
||||
actor.restart(reason)
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
|
|||
* which is licensed under the Apache 2 License.
|
||||
*/
|
||||
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
|
||||
lifeCycle = Some(LifeCycle(Permanent))
|
||||
lifeCycle = LifeCycle(Permanent)
|
||||
|
||||
def receive = {
|
||||
case UnSchedule =>
|
||||
|
|
|
|||
|
|
@ -8,20 +8,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.config.{ConfiguratorRepository, Configurator}
|
||||
import se.scalablesolutions.akka.util.Helpers._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* Messages that the supervisor responds to and returns.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed abstract class SupervisorMessage
|
||||
case object StartSupervisor extends SupervisorMessage
|
||||
case object StopSupervisor extends SupervisorMessage
|
||||
case class ConfigureSupervisor(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
|
||||
case object ConfigSupervisorSuccess extends SupervisorMessage
|
||||
|
||||
sealed abstract class FaultHandlingStrategy
|
||||
case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
|
||||
case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
|
||||
|
|
@ -93,7 +83,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
|
|||
extends Actor with Logging with Configurator {
|
||||
trapExit = List(classOf[Throwable])
|
||||
faultHandler = Some(handler)
|
||||
//dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
|
||||
val actors = new ConcurrentHashMap[String, Actor]
|
||||
|
||||
|
|
@ -103,9 +93,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
|
|||
|
||||
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
|
||||
|
||||
override def start = {
|
||||
override def start = synchronized {
|
||||
ConfiguratorRepository.registerConfigurator(this)
|
||||
actors.values.toArray.toList.foreach(println)
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
|
||||
actor.start
|
||||
log.info("Starting actor: %s", actor)
|
||||
|
|
@ -113,7 +102,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
|
|||
super[Actor].start
|
||||
}
|
||||
|
||||
override def stop = {
|
||||
override def stop = synchronized {
|
||||
super[Actor].stop
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
|
||||
actor.stop
|
||||
|
|
@ -123,21 +112,20 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case _ => throw new IllegalArgumentException("Supervisor does not respond to any messages")
|
||||
case unknown => throw new IllegalArgumentException("Supervisor does not respond to any messages. Unknown message [" + unknown + "]")
|
||||
}
|
||||
|
||||
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
|
||||
case SupervisorConfig(_, servers) =>
|
||||
servers.map(server =>
|
||||
server match {
|
||||
case Supervise(actor, lifecycle) =>
|
||||
case Supervise(actor, lifeCycle) =>
|
||||
actors.put(actor.getClass.getName, actor)
|
||||
actor.lifeCycle = Some(lifecycle)
|
||||
actor.lifeCycle = lifeCycle
|
||||
startLink(actor)
|
||||
|
||||
case SupervisorConfig(_, _) => // recursive configuration
|
||||
val supervisor = factory.newInstanceFor(server.asInstanceOf[SupervisorConfig])
|
||||
supervisor ! StartSupervisor
|
||||
factory.newInstanceFor(server.asInstanceOf[SupervisorConfig]).start
|
||||
// FIXME what to do with recursively supervisors?
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,8 @@ import se.scalablesolutions.akka.actor.Actor
|
|||
*/
|
||||
object Dispatchers {
|
||||
|
||||
object globalEventBasedThreadPoolDispatcher extends EventBasedThreadPoolDispatcher("global:eventbased:dispatcher")
|
||||
|
||||
/**
|
||||
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ trait MessageDemultiplexer {
|
|||
class MessageInvocation(val receiver: Actor,
|
||||
val message: AnyRef,
|
||||
val future: Option[CompletableFutureResult],
|
||||
val sender: Option[Actor],
|
||||
val tx: Option[Transaction]) {
|
||||
if (receiver == null) throw new IllegalArgumentException("receiver is null")
|
||||
if (message == null) throw new IllegalArgumentException("message is null")
|
||||
|
|
@ -65,6 +66,10 @@ class MessageInvocation(val receiver: Actor,
|
|||
}
|
||||
|
||||
override def toString(): String = synchronized {
|
||||
"MessageInvocation[message = " + message + ", receiver = " + receiver + ", future = " + future + ", tx = " + tx + "]"
|
||||
"MessageInvocation[message = " + message +
|
||||
", receiver = " + receiver +
|
||||
", sender = " + sender +
|
||||
", future = " + future +
|
||||
", tx = " + tx + "]"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,6 +155,7 @@ class RemoteClientHandler(val name: String,
|
|||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
import Actor._
|
||||
try {
|
||||
val result = event.getMessage
|
||||
if (result.isInstanceOf[RemoteReply]) {
|
||||
|
|
|
|||
|
|
@ -113,6 +113,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
|
|||
}
|
||||
|
||||
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
|
||||
import Actor._
|
||||
log.debug("Dispatching to remote actor [%s]", request.getTarget)
|
||||
val actor = createActor(request.getTarget, request.getTimeout)
|
||||
actor.start
|
||||
|
|
|
|||
|
|
@ -380,5 +380,3 @@ object Test5 extends Application {
|
|||
|
||||
//System.gc
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
object state {
|
||||
var s = "NIL"
|
||||
}
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
def receive = {
|
||||
case "Send" => reply("Reply")
|
||||
case "SendImplicit" => sender.get ! "ReplyImplicit"
|
||||
}
|
||||
}
|
||||
|
||||
class SenderActor(replyActor: Actor) extends Actor {
|
||||
def receive = {
|
||||
case "Init" => replyActor ! "Send"
|
||||
case "Reply" => state.s = "Reply"
|
||||
case "InitImplicit" => replyActor ! "SendImplicit"
|
||||
case "ReplyImplicit" => state.s = "ReplyImplicit"
|
||||
}
|
||||
}
|
||||
|
||||
class ActorFireForgetRequestReplyTest extends JUnitSuite {
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingReply = {
|
||||
import Actor._
|
||||
val replyActor = new ReplyActor
|
||||
replyActor.start
|
||||
val senderActor = new SenderActor(replyActor)
|
||||
senderActor.start
|
||||
senderActor ! "Init"
|
||||
Thread.sleep(10000)
|
||||
assert("Reply" === state.s)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldReplyToBangMessageUsingImplicitSender = {
|
||||
import Actor._
|
||||
val replyActor = new ReplyActor
|
||||
replyActor.start
|
||||
val senderActor = new SenderActor(replyActor)
|
||||
senderActor.start
|
||||
senderActor ! "InitImplicit"
|
||||
Thread.sleep(10000)
|
||||
assert("ReplyImplicit" === state.s)
|
||||
}
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import org.junit.Test
|
|||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
||||
class EventBasedSingleThreadActorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite {
|
|||
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
@ -73,8 +73,8 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite {
|
|||
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
|
||||
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None, None))
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
}
|
||||
|
|
@ -106,8 +106,8 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
class EventBasedThreadPoolActorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 10) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
@ -109,10 +109,10 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite {
|
|||
}
|
||||
})
|
||||
dispatcher.start
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None, None))
|
||||
|
||||
handlersBarrier.await(5, TimeUnit.SECONDS)
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
@ -151,8 +151,8 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ class RemoteActorSpecActorBidirectional extends Actor {
|
|||
}
|
||||
|
||||
class RemoteActorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
akka.Config.config
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ object Log {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteSupervisorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
akka.Config.config
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
|
|
@ -35,7 +36,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldStartServer = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -45,7 +46,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillSingleActorOneForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
|
|
@ -59,7 +60,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldCallKillCallSingleActorOneForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -87,7 +88,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillSingleActorAllForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
|
|
@ -101,7 +102,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldCallKillCallSingleActorAllForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -129,7 +130,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillMultipleActorsOneForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 !! BinaryString("Die")
|
||||
|
|
@ -143,7 +144,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
def tesCallKillCallMultipleActorsOneForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -187,7 +188,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillMultipleActorsAllForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! BinaryString("Die")
|
||||
|
|
@ -201,7 +202,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
def tesCallKillCallMultipleActorsAllForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -246,7 +247,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! BinaryString("Die")
|
||||
Thread.sleep(500)
|
||||
|
|
@ -258,7 +259,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
|
|
@ -282,7 +283,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillSingleActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! BinaryString("Die")
|
||||
|
|
@ -296,7 +297,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayCallKillCallSingleActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -324,7 +325,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillMultipleActorsOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 ! BinaryString("Die")
|
||||
|
|
@ -338,7 +339,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
def tesOneWayCallKillCallMultipleActorsOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
|
|
@ -382,7 +383,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillMultipleActorsAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! BinaryString("Die")
|
||||
|
|
@ -396,7 +397,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
def tesOneWayCallKillCallMultipleActorsAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
pingpong1 ! BinaryString("Ping")
|
||||
|
|
@ -442,7 +443,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
@Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import org.junit.Test
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class SupervisorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
|
||||
var messageLog: String = ""
|
||||
var oneWayLog: String = ""
|
||||
|
|
@ -24,7 +25,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldStartServer = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
|
|
@ -34,7 +35,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
|
|
@ -48,7 +49,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldCallKillCallSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
|
|
@ -76,7 +77,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
|
|
@ -90,7 +91,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldCallKillCallSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
|
|
@ -118,7 +119,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 !! Die
|
||||
|
|
@ -132,7 +133,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
def tesCallKillCallMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
|
|
@ -176,7 +177,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldKillMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! Die
|
||||
|
|
@ -190,7 +191,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
def tesCallKillCallMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
|
|
@ -234,7 +235,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! Die
|
||||
Thread.sleep(500)
|
||||
|
|
@ -246,7 +247,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
|
|
@ -269,7 +270,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! Die
|
||||
|
|
@ -283,7 +284,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayCallKillCallSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
|
|
@ -311,7 +312,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 ! Die
|
||||
|
|
@ -325,7 +326,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
def tesOneWayCallKillCallMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
|
|
@ -369,7 +370,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldOneWayKillMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! Die
|
||||
|
|
@ -383,7 +384,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
def tesOneWayCallKillCallMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
pingpong1 ! Ping
|
||||
|
|
@ -429,7 +430,7 @@ class SupervisorTest extends JUnitSuite {
|
|||
@Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup ! StartSupervisor
|
||||
sup.start
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import org.junit.Test
|
|||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
||||
class ThreadBasedActorTest extends JUnitSuite {
|
||||
import Actor._
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ class ThreadBasedDispatcherTest extends JUnitSuite {
|
|||
val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
@ -78,7 +78,7 @@ class ThreadBasedDispatcherTest extends JUnitSuite {
|
|||
})
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
|
||||
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None, None))
|
||||
}
|
||||
assert(handleLatch.await(5, TimeUnit.SECONDS))
|
||||
assert(!threadingIssueDetected.get)
|
||||
|
|
|
|||
|
|
@ -285,7 +285,7 @@ object AMQP extends Actor {
|
|||
val passive: Boolean,
|
||||
val durable: Boolean,
|
||||
val configurationArguments: Map[java.lang.String, Object])
|
||||
extends FaultTolerantConnectionActor { self: Consumer =>
|
||||
extends FaultTolerantConnectionActor { consumer: Consumer =>
|
||||
|
||||
faultHandler = Some(OneForOneStrategy(5, 5000))
|
||||
trapExit = List(classOf[Throwable])
|
||||
|
|
@ -370,7 +370,7 @@ object AMQP extends Actor {
|
|||
} catch {
|
||||
case cause =>
|
||||
log.error("Delivery of message to MessageConsumerListener [%s] failed due to [%s]", listener.toString(exchangeName), cause.toString)
|
||||
self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
|
||||
consumer ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -384,7 +384,7 @@ object AMQP extends Actor {
|
|||
case Some(listener) =>
|
||||
log.warning("MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
|
||||
listener.toString(exchangeName), signal.getReference, signal.getReason)
|
||||
self ! UnregisterMessageConsumerListener(listener)
|
||||
consumer ! UnregisterMessageConsumerListener(listener)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -422,8 +422,6 @@ object AMQP extends Actor {
|
|||
}
|
||||
|
||||
trait FaultTolerantConnectionActor extends Actor {
|
||||
lifeCycle = Some(LifeCycle(Permanent))
|
||||
|
||||
val reconnectionTimer = new Timer
|
||||
|
||||
var connection: Connection = _
|
||||
|
|
|
|||
|
|
@ -35,7 +35,9 @@ object Config extends Logging {
|
|||
Configgy.configure(configFile)
|
||||
log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile)
|
||||
} catch {
|
||||
case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.")
|
||||
case e: ParseException => throw new IllegalStateException(
|
||||
"'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] aborting." +
|
||||
"\n\tEither add it in the 'config' directory or add it to the classpath.")
|
||||
}
|
||||
} else if (System.getProperty("akka.config", "") != "") {
|
||||
val configFile = System.getProperty("akka.config", "")
|
||||
|
|
@ -43,21 +45,26 @@ object Config extends Logging {
|
|||
Configgy.configure(configFile)
|
||||
log.info("Config loaded from -Dakka.config=%s", configFile)
|
||||
} catch {
|
||||
case e: ParseException => throw new IllegalStateException("Config could not be loaded from -Dakka.config=" + configFile)
|
||||
case e: ParseException => throw new IllegalStateException(
|
||||
"Config could not be loaded from -Dakka.config=" + configFile)
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
|
||||
log.info("Config loaded from the application classpath.")
|
||||
} catch {
|
||||
case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
|
||||
case e: ParseException => throw new IllegalStateException(
|
||||
"'$AKKA_HOME/config/akka.conf' could not be found" +
|
||||
"\n\tand no 'akka.conf' can be found on the classpath - aborting." +
|
||||
"\n\tEither add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
|
||||
}
|
||||
}
|
||||
Configgy.config
|
||||
}
|
||||
|
||||
val CONFIG_VERSION = config.getString("akka.version", "0")
|
||||
if (VERSION != CONFIG_VERSION) throw new IllegalStateException("Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
|
||||
if (VERSION != CONFIG_VERSION) throw new IllegalStateException(
|
||||
"Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
|
||||
val startTime = System.currentTimeMillis
|
||||
|
||||
def uptime = (System.currentTimeMillis - startTime) / 1000
|
||||
|
|
|
|||
|
|
@ -10,6 +10,11 @@ description
|
|||
</action>
|
||||
see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full guide
|
||||
-->
|
||||
|
||||
<!--action dev="jboner" type="fix" issue="23" system="github">
|
||||
Info message
|
||||
</action-->
|
||||
|
||||
<document>
|
||||
<properties>
|
||||
<title>Akka Release Notes</title>
|
||||
|
|
@ -33,11 +38,13 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui
|
|||
<action dev="Jonas Bonér" type="add">Monadic API to TransactionalRef (use it in for-comprehension)</action>
|
||||
<action dev="Jonas Bonér" type="add">Lightweight actor syntax 'actor { case _ => .. }'</action>
|
||||
<action dev="Jonas Bonér" type="add">New Scala JSON parser based on sjson</action>
|
||||
<action dev="Jonas Bonér" type="add">Upgraded to Netty 3.2 and Protobuf 2.2</action>
|
||||
<action dev="Jonas Bonér" type="add">Monadic API to TransactionalRef (use it in for-comprehension)</action>
|
||||
<action dev="Jonas Bonér" type="add">Smoother web app integration; just add akka.conf to WEB-INF/classes, no need for AKKA_HOME</action>
|
||||
<action dev="Jonas Bonér" type="add">Modularization of distribution into a thin core (actors, remoting and STM) and the rest in submodules</action>
|
||||
<action dev="Jonas Bonér" type="add">JSON serialization for Java objects (using Jackson)</action>
|
||||
<action dev="Jonas Bonér" type="add">JSON serialization for Scala objects (using SJSON)</action>
|
||||
<action dev="Jonas Bonér" type="add">Added implementation for remote actor reconnect upon failure</action>
|
||||
<action dev="Jonas Bonér" type="add">Protobuf serialization for Java and Scala objects</action>
|
||||
<action dev="Jonas Bonér" type="add">SBinary serialization for Scala objects</action>
|
||||
<action dev="Jonas Bonér" type="add">Protobuf as remote protocol</action>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue