diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index dd14cda0a5..983d323148 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -160,6 +160,8 @@ private[akka] sealed case class AspectInit( */ @Aspect("perInstance") private[akka] sealed class ActiveObjectAspect { + import Actor._ + @volatile var isInitialized = false var target: Class[_] = _ var actor: Dispatcher = _ diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 4f63d8aa26..de1cb5fba6 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -23,15 +23,22 @@ import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.utils.ThreadLocalTransaction._ -@serializable sealed abstract class LifeCycleMessage +/** + * Mix in this trait to give an actor TransactionRequired semantics. + * Equivalent to invoking the 'makeTransactionRequired' method in the actor. + */ +trait TransactionRequired { this: Actor => + makeTransactionRequired +} + +@serializable sealed trait LifeCycleMessage case class Init(config: AnyRef) extends LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class Restart(reason: AnyRef) extends LifeCycleMessage case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage -case class Kill(killer: Actor) extends LifeCycleMessage -//case object TransactionalInit extends LifeCycleMessage +case object Kill extends LifeCycleMessage -class ActorKilledException(val killed: Actor, val killer: Actor) extends RuntimeException("Actor [" + killed + "] killed by [" + killer + "]") +class ActorKilledException(val killed: Actor) extends RuntimeException("Actor [" + killed + "] was killed by a Kill message") sealed abstract class DispatcherType object DispatcherType { @@ -55,13 +62,21 @@ object Actor { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + implicit val any: AnyRef = this + def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { start def receive = body } + def actor(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() { + start + body + def receive = matcher + } + def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() { - lifeCycle = Some(lifeCycleConfig) + lifeCycle = lifeCycleConfig start def receive = body } @@ -82,12 +97,17 @@ object Actor { */ trait Actor extends Logging with TransactionManagement { ActorRegistry.register(this) + + implicit val self: AnyRef = this // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait val uuid = Uuid.newUuid.toString + // ==================================== // private fields - @volatile private var _isRunning: Boolean = false + // ==================================== + + @volatile private var _isRunning: Boolean = false @volatile private var _isShutDown: Boolean = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _config: Option[AnyRef] = None @@ -97,12 +117,28 @@ trait Actor extends Logging with TransactionManagement { private[akka] var _mailbox: MessageQueue = _ private[akka] var _supervisor: Option[Actor] = None + // ==================================== + // protected fields + // ==================================== + /** - * This field should normally not be touched by user code, which should instead use the 'reply' method. + * The 'sender' field holds the sender of the message currently being processed. + *

+ * If the sender was an actor then it is defined as 'Some(senderActor)' and + * if the sender was of some other instance then it is defined as 'None'. + *

+ * This sender reference can be used together with the '!' method for request/reply + * message exchanges and which is in many ways better than using the '!!' method + * which will make the sender wait for a reply using a *blocking* future. + */ + protected[this] var sender: Option[Actor] = None + + /** + * The 'senderFuture' field should normally not be touched by user code, which should instead use the 'reply' method. * But it can be used for advanced use-cases when one might want to store away the future and * resolve it later and/or somewhere else. */ - protected var senderFuture: Option[CompletableFutureResult] = None + protected[this] var senderFuture: Option[CompletableFutureResult] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -127,30 +163,19 @@ trait Actor extends Logging with TransactionManagement { /** * User overridable callback/setting. - * - * User can (and is encouraged to) override the default configuration (newEventBasedThreadPoolDispatcher) - * so it fits the specific use-case that the actor is used for. *

- * It is beneficial to have actors share the same dispatcher, easily +100 actors can share the same. + * The default dispatcher is the Dispatchers.globalEventBasedThreadPoolDispatcher. + * This means that all actors will share the same event-driven thread-pool based dispatcher. *

- * But if you are running many many actors then it can be a good idea to have split them up in terms of - * dispatcher sharing. + * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. *

- * Default is that all actors that are created and spawned from within this actor is sharing the same - * dispatcher as its creator. - *

-   *   val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
-   *   dispatcher
-   *     .withNewThreadPoolWithBoundedBlockingQueue(100)
-   *     .setCorePoolSize(16)
-   *     .setMaxPoolSize(128)
-   *     .setKeepAliveTimeInMillis(60000)
-   *     .setRejectionPolicy(new CallerRunsPolicy)
-   *     .buildThreadPool
-   * 
+ * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. */ protected[akka] var messageDispatcher: MessageDispatcher = { - val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName) + val dispatcher = Dispatchers.globalEventBasedThreadPoolDispatcher _mailbox = dispatcher.messageQueue dispatcher } @@ -190,7 +215,7 @@ trait Actor extends Logging with TransactionManagement { * * Defines the life-cycle for a supervised actor. Default is 'LifeCycle(Permanent)' but can be overridden. */ - @volatile var lifeCycle: Option[LifeCycle] = Some(LifeCycle(Permanent)) + @volatile var lifeCycle: LifeCycle = LifeCycle(Permanent) /** * User overridable callback/setting. @@ -270,7 +295,7 @@ trait Actor extends Logging with TransactionManagement { /** * Starts up the actor and its message queue. */ - def start = synchronized { + def start: Actor = synchronized { if (_isShutDown) throw new IllegalStateException("Can't restart an actor that have been shut down with 'exit'") if (!_isRunning) { dispatcher.registerHandler(this, new ActorMessageInvoker(this)) @@ -279,6 +304,7 @@ trait Actor extends Logging with TransactionManagement { //if (isTransactional) this !! TransactionalInit } log.info("[%s] has started", toString) + this } /** @@ -303,10 +329,28 @@ trait Actor extends Logging with TransactionManagement { /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + *

+ * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable. + *

+ * If invoked from within another object then add this import to resolve the implicit argument: + *

+   * import se.scalablesolutions.akka.actor.Actor._
+   * 
*/ - def !(message: AnyRef) = - if (_isRunning) postMessageToMailbox(message) - else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + def !(message: AnyRef)(implicit sender: AnyRef) = { + val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) + else None + if (_isRunning) postMessageToMailbox(message, from) + else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") + } + + def send(message: AnyRef) = { + if (_isRunning) postMessageToMailbox(message, None) + else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") + } /** * Sends a message asynchronously and waits on a future for a reply message. @@ -315,6 +359,8 @@ trait Actor extends Logging with TransactionManagement { * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics. *

* NOTE: + * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to + * implement request/response message exchanges. * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ @@ -330,7 +376,8 @@ trait Actor extends Logging with TransactionManagement { else None } getResultOrThrowException(future) - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") /** * Sends a message asynchronously and waits on a future for a reply message. @@ -339,6 +386,8 @@ trait Actor extends Logging with TransactionManagement { * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics. *

* NOTE: + * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to + * implement request/response message exchanges. * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ @@ -354,22 +403,31 @@ trait Actor extends Logging with TransactionManagement { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0) future.awaitBlocking getResultOrThrowException(future).get - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") /** * Use reply(..) to reply with a message to the original sender of the message currently * being processed. - *

- * NOTE: - * Does only work together with the actor !! method and/or active objects not annotated - * with @oneway. */ - protected[this] def reply(message: AnyRef) = senderFuture match { - case None => throw new IllegalStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tHave you used the '!' message send or the '@oneway' active object annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." ) - case Some(future) => future.completeWithResult(message) + protected[this] def reply(message: AnyRef) = { + sender match { + case Some(senderActor) => + senderActor ! message + case None => + senderFuture match { + case None => + throw new IllegalStateException( + "\n\tNo sender in scope, can't reply. " + + "\n\tYou have probably used the '!' method to either; " + + "\n\t\t1. Send a message to a remote actor" + + "\n\t\t2. Send a message from an instance that is *not* an actor" + + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." ) + case Some(future) => + future.completeWithResult(message) + } + } } /** @@ -385,7 +443,8 @@ trait Actor extends Logging with TransactionManagement { messageDispatcher = dispatcher _mailbox = messageDispatcher.messageQueue messageDispatcher.registerHandler(this, new ActorMessageInvoker(this)) - } else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started") + } else throw new IllegalArgumentException( + "Can not swap dispatcher for " + toString + " after it has been started") } /** @@ -411,7 +470,8 @@ trait Actor extends Logging with TransactionManagement { * */ def makeTransactionRequired = synchronized { - if (_isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started") + if (_isRunning) throw new IllegalArgumentException( + "Can not make actor transaction required after it has been started") else isTransactionRequiresNew = true } @@ -428,10 +488,12 @@ trait Actor extends Logging with TransactionManagement { protected[this] def link(actor: Actor) = { if (_isRunning) { _linkedActors.add(actor) - if (actor._supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") + if (actor._supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") actor._supervisor = Some(this) log.debug("Linking actor [%s] to actor [%s]", actor, this) - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -445,7 +507,8 @@ trait Actor extends Logging with TransactionManagement { _linkedActors.remove(actor) actor._supervisor = None log.debug("Unlinking actor [%s] from actor [%s]", actor, this) - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -527,7 +590,7 @@ trait Actor extends Logging with TransactionManagement { // ==== IMPLEMENTATION DETAILS ==== // ================================ - private def postMessageToMailbox(message: AnyRef): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -541,7 +604,7 @@ trait Actor extends Logging with TransactionManagement { RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) } else { - val handle = new MessageInvocation(this, message, None, currentTransaction.get) + val handle = new MessageInvocation(this, message, None, sender, currentTransaction.get) handle.send } } @@ -560,10 +623,11 @@ trait Actor extends Logging with TransactionManagement { if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + else throw new IllegalStateException( + "Expected a future from remote call to actor " + toString) } else { val future = new DefaultCompletableFutureResult(timeout) - val handle = new MessageInvocation(this, message, Some(future), currentTransaction.get) + val handle = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) handle.send future } @@ -581,16 +645,17 @@ trait Actor extends Logging with TransactionManagement { setTransaction(messageHandle.tx) val message = messageHandle.message //serializeMessage(messageHandle.message) - val future = messageHandle.future + senderFuture = messageHandle.future + sender = messageHandle.sender + try { - senderFuture = future if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) } catch { case e => // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) - if (future.isDefined) future.get.completeWithException(this, e) + if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) else e.printStackTrace } finally { clearTransaction @@ -601,23 +666,25 @@ trait Actor extends Logging with TransactionManagement { setTransaction(messageHandle.tx) val message = messageHandle.message //serializeMessage(messageHandle.message) - val future = messageHandle.future + senderFuture = messageHandle.future + sender = messageHandle.sender def proceed = { try { incrementTransaction if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException("Actor " + toString + " could not process message [" + message + "] since no matching 'case' clause in its 'receive' method could be found") + else throw new IllegalArgumentException( + "Actor " + toString + " could not process message [" + message + "]" + + "\n\tsince no matching 'case' clause in its 'receive' method could be found") } finally { decrementTransaction } } try { - senderFuture = future if (isTransactionRequiresNew && !isTransactionInScope) { if (senderFuture.isEmpty) throw new StmException( - "\n\tCan't continue transaction in a one-way fire-forget message send" + + "Can't continue transaction in a one-way fire-forget message send" + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") atomic { @@ -627,12 +694,9 @@ trait Actor extends Logging with TransactionManagement { } catch { case e => e.printStackTrace - - if (future.isDefined) future.get.completeWithException(this, e) + if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) else e.printStackTrace - clearTransaction // need to clear currentTransaction before call to supervisor - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) } finally { @@ -651,8 +715,7 @@ trait Actor extends Logging with TransactionManagement { case HotSwap(code) => _hotswap = code case Restart(reason) => restart(reason) case Exit(dead, reason) => handleTrapExit(dead, reason) - case Kill(killer) => throw new ActorKilledException(this, killer) -// case TransactionalInit => initTransactionalState + case Kill => throw new ActorKilledException(this) } private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { @@ -663,7 +726,9 @@ trait Actor extends Logging with TransactionManagement { case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) } - } else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' member field set to non-empty list of exception classes - can't proceed " + toString) + } else throw new IllegalStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) } else { if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on } @@ -672,8 +737,7 @@ trait Actor extends Logging with TransactionManagement { private[this] def restartLinkedActors(reason: AnyRef) = { _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.lifeCycle match { - case None => throw new IllegalStateException("Actor [" + actor.id + "] does not have a life-cycle defined.") - case Some(LifeCycle(scope, _)) => { + case LifeCycle(scope, _) => { scope match { case Permanent => actor.restart(reason) diff --git a/akka-actors/src/main/scala/actor/Scheduler.scala b/akka-actors/src/main/scala/actor/Scheduler.scala index 6266c17942..9b9ee8bc7e 100644 --- a/akka-actors/src/main/scala/actor/Scheduler.scala +++ b/akka-actors/src/main/scala/actor/Scheduler.scala @@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * which is licensed under the Apache 2 License. */ class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { - lifeCycle = Some(LifeCycle(Permanent)) + lifeCycle = LifeCycle(Permanent) def receive = { case UnSchedule => diff --git a/akka-actors/src/main/scala/actor/Supervisor.scala b/akka-actors/src/main/scala/actor/Supervisor.scala index 1dadd2cdac..ec65fbfec7 100644 --- a/akka-actors/src/main/scala/actor/Supervisor.scala +++ b/akka-actors/src/main/scala/actor/Supervisor.scala @@ -8,20 +8,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.{ConfiguratorRepository, Configurator} import se.scalablesolutions.akka.util.Helpers._ import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.dispatch.Dispatchers import java.util.concurrent.ConcurrentHashMap -/** - * Messages that the supervisor responds to and returns. - * - * @author Jonas Bonér - */ -sealed abstract class SupervisorMessage -case object StartSupervisor extends SupervisorMessage -case object StopSupervisor extends SupervisorMessage -case class ConfigureSupervisor(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage -case object ConfigSupervisorSuccess extends SupervisorMessage - sealed abstract class FaultHandlingStrategy case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy @@ -93,7 +83,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor with Logging with Configurator { trapExit = List(classOf[Throwable]) faultHandler = Some(handler) - //dispatcher = Dispatchers.newThreadBasedDispatcher(this) + dispatcher = Dispatchers.newThreadBasedDispatcher(this) val actors = new ConcurrentHashMap[String, Actor] @@ -103,9 +93,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy) def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) - override def start = { + override def start = synchronized { ConfiguratorRepository.registerConfigurator(this) - actors.values.toArray.toList.foreach(println) _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start log.info("Starting actor: %s", actor) @@ -113,7 +102,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy) super[Actor].start } - override def stop = { + override def stop = synchronized { super[Actor].stop _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop @@ -123,21 +112,20 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy) } def receive = { - case _ => throw new IllegalArgumentException("Supervisor does not respond to any messages") + case unknown => throw new IllegalArgumentException("Supervisor does not respond to any messages. Unknown message [" + unknown + "]") } def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actor, lifecycle) => + case Supervise(actor, lifeCycle) => actors.put(actor.getClass.getName, actor) - actor.lifeCycle = Some(lifecycle) + actor.lifeCycle = lifeCycle startLink(actor) case SupervisorConfig(_, _) => // recursive configuration - val supervisor = factory.newInstanceFor(server.asInstanceOf[SupervisorConfig]) - supervisor ! StartSupervisor + factory.newInstanceFor(server.asInstanceOf[SupervisorConfig]).start // FIXME what to do with recursively supervisors? }) } diff --git a/akka-actors/src/main/scala/dispatch/Dispatchers.scala b/akka-actors/src/main/scala/dispatch/Dispatchers.scala index ed4bd6d704..a82191af1a 100644 --- a/akka-actors/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actors/src/main/scala/dispatch/Dispatchers.scala @@ -39,7 +39,9 @@ import se.scalablesolutions.akka.actor.Actor * @author Jonas Bonér */ object Dispatchers { - + + object globalEventBasedThreadPoolDispatcher extends EventBasedThreadPoolDispatcher("global:eventbased:dispatcher") + /** * Creates an event based dispatcher serving multiple (millions) of actors through a thread pool. * Has a fluent builder interface for configuring its semantics. diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index 36be55a391..38c311097d 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -39,6 +39,7 @@ trait MessageDemultiplexer { class MessageInvocation(val receiver: Actor, val message: AnyRef, val future: Option[CompletableFutureResult], + val sender: Option[Actor], val tx: Option[Transaction]) { if (receiver == null) throw new IllegalArgumentException("receiver is null") if (message == null) throw new IllegalArgumentException("message is null") @@ -65,6 +66,10 @@ class MessageInvocation(val receiver: Actor, } override def toString(): String = synchronized { - "MessageInvocation[message = " + message + ", receiver = " + receiver + ", future = " + future + ", tx = " + tx + "]" + "MessageInvocation[message = " + message + + ", receiver = " + receiver + + ", sender = " + sender + + ", future = " + future + + ", tx = " + tx + "]" } } diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index c25782fe31..4e1a777181 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -155,6 +155,7 @@ class RemoteClientHandler(val name: String, } override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { + import Actor._ try { val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index 0c89ac6246..635b231e52 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -113,6 +113,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL } private def dispatchToActor(request: RemoteRequest, channel: Channel) = { + import Actor._ log.debug("Dispatching to remote actor [%s]", request.getTarget) val actor = createActor(request.getTarget, request.getTimeout) actor.start diff --git a/akka-actors/src/main/scala/stm/DataFlowVariable.scala b/akka-actors/src/main/scala/stm/DataFlowVariable.scala index ec47f0983f..44a40f50af 100644 --- a/akka-actors/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-actors/src/main/scala/stm/DataFlowVariable.scala @@ -24,11 +24,11 @@ object DataFlow { thread } - def thread[MessageType, ReturnType](body: MessageType => ReturnType) = + def thread[MessageType, ReturnType](body: MessageType => ReturnType) = new ReactiveEventBasedThread(body).start private class IsolatedEventBasedThread(body: => Unit) extends Actor { - def act = loop { + def act = loop { react { case 'start => body case 'exit => exit() @@ -37,7 +37,7 @@ object DataFlow { } private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor { - def act = loop { + def act = loop { react { case 'exit => exit() case message => sender ! body(message.asInstanceOf[MessageType]) @@ -48,7 +48,7 @@ object DataFlow { /** * @author Jonas Bonér */ - sealed class DataFlowVariable[T] { + sealed class DataFlowVariable[T] { private sealed abstract class DataFlowVariableMessage private case class Set[T](value: T) extends DataFlowVariableMessage @@ -73,7 +73,7 @@ object DataFlow { private class Out[T](dataFlow: DataFlowVariable[T]) extends Actor { var reader: Option[OutputChannel[Any]] = None def act = loop { react { - case Get => + case Get => val ref = dataFlow.value.get if (ref.isDefined) reply(ref.get) else reader = Some(sender) case Set(v) => if (reader.isDefined) reader.get ! v @@ -380,5 +380,3 @@ object Test5 extends Application { //System.gc } - - diff --git a/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala new file mode 100644 index 0000000000..826f157593 --- /dev/null +++ b/akka-actors/src/test/scala/ActorFireForgetRequestReplyTest.scala @@ -0,0 +1,51 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +object state { + var s = "NIL" +} + +class ReplyActor extends Actor { + def receive = { + case "Send" => reply("Reply") + case "SendImplicit" => sender.get ! "ReplyImplicit" + } +} + +class SenderActor(replyActor: Actor) extends Actor { + def receive = { + case "Init" => replyActor ! "Send" + case "Reply" => state.s = "Reply" + case "InitImplicit" => replyActor ! "SendImplicit" + case "ReplyImplicit" => state.s = "ReplyImplicit" + } +} + +class ActorFireForgetRequestReplyTest extends JUnitSuite { + + @Test + def shouldReplyToBangMessageUsingReply = { + import Actor._ + val replyActor = new ReplyActor + replyActor.start + val senderActor = new SenderActor(replyActor) + senderActor.start + senderActor ! "Init" + Thread.sleep(10000) + assert("Reply" === state.s) + } + + @Test + def shouldReplyToBangMessageUsingImplicitSender = { + import Actor._ + val replyActor = new ReplyActor + replyActor.start + val senderActor = new SenderActor(replyActor) + senderActor.start + senderActor ! "InitImplicit" + Thread.sleep(10000) + assert("ReplyImplicit" === state.s) + } +} diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala index 67fbf89da8..bd7389e676 100644 --- a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala +++ b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala @@ -8,6 +8,7 @@ import org.junit.Test import se.scalablesolutions.akka.dispatch.Dispatchers class EventBasedSingleThreadActorTest extends JUnitSuite { + import Actor._ private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala index 2997c3b4f5..649d95f4d2 100644 --- a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala @@ -61,7 +61,7 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite { dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) @@ -73,8 +73,8 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite { dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.start - dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None, None)) assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) } @@ -106,8 +106,8 @@ class EventBasedSingleThreadDispatcherTest extends JUnitSuite { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) diff --git a/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala b/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala index e82784cc67..55345ec25e 100644 --- a/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala +++ b/akka-actors/src/test/scala/EventBasedThreadPoolActorTest.scala @@ -6,6 +6,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test class EventBasedThreadPoolActorTest extends JUnitSuite { + import Actor._ private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { diff --git a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala index 84d778a39d..f8c0107d05 100644 --- a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala @@ -67,7 +67,7 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite { }) dispatcher.start for (i <- 0 until 10) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) @@ -109,10 +109,10 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite { } }) dispatcher.start - dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None, None)) handlersBarrier.await(5, TimeUnit.SECONDS) assert(!threadingIssueDetected.get) @@ -151,8 +151,8 @@ class EventBasedThreadPoolDispatcherTest extends JUnitSuite { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None)) - dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new java.lang.Integer(i), None, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key2, new java.lang.Integer(i), None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) diff --git a/akka-actors/src/test/scala/RemoteActorTest.scala b/akka-actors/src/test/scala/RemoteActorTest.scala index 1b6cb5bd76..bea33a50d9 100644 --- a/akka-actors/src/test/scala/RemoteActorTest.scala +++ b/akka-actors/src/test/scala/RemoteActorTest.scala @@ -27,6 +27,7 @@ class RemoteActorSpecActorBidirectional extends Actor { } class RemoteActorTest extends JUnitSuite { + import Actor._ akka.Config.config new Thread(new Runnable() { def run = { diff --git a/akka-actors/src/test/scala/RemoteSupervisorTest.scala b/akka-actors/src/test/scala/RemoteSupervisorTest.scala index 2150707fac..9405af6dc9 100644 --- a/akka-actors/src/test/scala/RemoteSupervisorTest.scala +++ b/akka-actors/src/test/scala/RemoteSupervisorTest.scala @@ -20,6 +20,7 @@ object Log { * @author Jonas Bonér */ class RemoteSupervisorTest extends JUnitSuite { + import Actor._ akka.Config.config new Thread(new Runnable() { def run = { @@ -35,7 +36,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldStartServer = { Log.messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") @@ -45,7 +46,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldKillSingleActorOneForOne = { Log.messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! BinaryString("Die") @@ -59,7 +60,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldCallKillCallSingleActorOneForOne = { Log.messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") @@ -87,7 +88,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldKillSingleActorAllForOne = { Log.messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! BinaryString("Die") @@ -101,7 +102,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldCallKillCallSingleActorAllForOne = { Log.messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") @@ -129,7 +130,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldKillMultipleActorsOneForOne = { Log.messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong3 !! BinaryString("Die") @@ -143,7 +144,7 @@ class RemoteSupervisorTest extends JUnitSuite { def tesCallKillCallMultipleActorsOneForOne = { Log.messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") @@ -187,7 +188,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldKillMultipleActorsAllForOne = { Log.messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong2 !! BinaryString("Die") @@ -201,7 +202,7 @@ class RemoteSupervisorTest extends JUnitSuite { def tesCallKillCallMultipleActorsAllForOne = { Log.messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") @@ -246,7 +247,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayKillSingleActorOneForOne = { Logg.messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) pingpong1 ! BinaryString("Die") Thread.sleep(500) @@ -258,7 +259,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayCallKillCallSingleActorOneForOne = { Logg.messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) pingpong1 ! OneWay Thread.sleep(500) @@ -282,7 +283,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayKillSingleActorAllForOne = { Logg.messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 ! BinaryString("Die") @@ -296,7 +297,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayCallKillCallSingleActorAllForOne = { Logg.messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 ! BinaryString("Ping")).getOrElse("nil") @@ -324,7 +325,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayKillMultipleActorsOneForOne = { Logg.messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong3 ! BinaryString("Die") @@ -338,7 +339,7 @@ class RemoteSupervisorTest extends JUnitSuite { def tesOneWayCallKillCallMultipleActorsOneForOne = { Logg.messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 ! BinaryString("Ping")).getOrElse("nil") @@ -382,7 +383,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldOneWayKillMultipleActorsAllForOne = { Logg.messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong2 ! BinaryString("Die") @@ -396,7 +397,7 @@ class RemoteSupervisorTest extends JUnitSuite { def tesOneWayCallKillCallMultipleActorsAllForOne = { Logg.messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { pingpong1 ! BinaryString("Ping") @@ -442,7 +443,7 @@ class RemoteSupervisorTest extends JUnitSuite { @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = { Logg.messageLog = "" val sup = getNestedSupervisorsAllForOneConf - sup ! StartSupervisor + sup.start intercept[RuntimeException] { pingpong1 !! BinaryString("Die") } diff --git a/akka-actors/src/test/scala/SupervisorTest.scala b/akka-actors/src/test/scala/SupervisorTest.scala index f2a98727d6..df59ee832c 100644 --- a/akka-actors/src/test/scala/SupervisorTest.scala +++ b/akka-actors/src/test/scala/SupervisorTest.scala @@ -13,6 +13,7 @@ import org.junit.Test * @author Jonas Bonér */ class SupervisorTest extends JUnitSuite { + import Actor._ var messageLog: String = "" var oneWayLog: String = "" @@ -24,7 +25,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldStartServer = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start expect("pong") { (pingpong1 !! Ping).getOrElse("nil") @@ -34,7 +35,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldKillSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! Die @@ -48,7 +49,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldCallKillCallSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") @@ -76,7 +77,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldKillSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! Die @@ -90,7 +91,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldCallKillCallSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") @@ -118,7 +119,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldKillMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong3 !! Die @@ -132,7 +133,7 @@ class SupervisorTest extends JUnitSuite { def tesCallKillCallMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") @@ -176,7 +177,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldKillMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong2 !! Die @@ -190,7 +191,7 @@ class SupervisorTest extends JUnitSuite { def tesCallKillCallMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") @@ -234,7 +235,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayKillSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) pingpong1 ! Die Thread.sleep(500) @@ -246,7 +247,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayCallKillCallSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) pingpong1 ! OneWay Thread.sleep(500) @@ -269,7 +270,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayKillSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong1 ! Die @@ -283,7 +284,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayCallKillCallSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 ! Ping).getOrElse("nil") @@ -311,7 +312,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayKillMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong3 ! Die @@ -325,7 +326,7 @@ class SupervisorTest extends JUnitSuite { def tesOneWayCallKillCallMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { (pingpong1 ! Ping).getOrElse("nil") @@ -369,7 +370,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldOneWayKillMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) intercept[RuntimeException] { pingpong2 ! Die @@ -383,7 +384,7 @@ class SupervisorTest extends JUnitSuite { def tesOneWayCallKillCallMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! StartSupervisor + sup.start Thread.sleep(500) expect("pong") { pingpong1 ! Ping @@ -429,7 +430,7 @@ class SupervisorTest extends JUnitSuite { @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = { messageLog = "" val sup = getNestedSupervisorsAllForOneConf - sup ! StartSupervisor + sup.start intercept[RuntimeException] { pingpong1 !! Die } diff --git a/akka-actors/src/test/scala/ThreadBasedActorTest.scala b/akka-actors/src/test/scala/ThreadBasedActorTest.scala index e39b9322f1..dca6634acc 100644 --- a/akka-actors/src/test/scala/ThreadBasedActorTest.scala +++ b/akka-actors/src/test/scala/ThreadBasedActorTest.scala @@ -8,6 +8,7 @@ import org.junit.Test import se.scalablesolutions.akka.dispatch.Dispatchers class ThreadBasedActorTest extends JUnitSuite { + import Actor._ private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { diff --git a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala index 1495257a7f..b434762b37 100644 --- a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala @@ -57,7 +57,7 @@ class ThreadBasedDispatcherTest extends JUnitSuite { val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) @@ -78,7 +78,7 @@ class ThreadBasedDispatcherTest extends JUnitSuite { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 3143f41ee6..9f591f9891 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -285,7 +285,7 @@ object AMQP extends Actor { val passive: Boolean, val durable: Boolean, val configurationArguments: Map[java.lang.String, Object]) - extends FaultTolerantConnectionActor { self: Consumer => + extends FaultTolerantConnectionActor { consumer: Consumer => faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) @@ -370,7 +370,7 @@ object AMQP extends Actor { } catch { case cause => log.error("Delivery of message to MessageConsumerListener [%s] failed due to [%s]", listener.toString(exchangeName), cause.toString) - self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect + consumer ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect } } @@ -384,7 +384,7 @@ object AMQP extends Actor { case Some(listener) => log.warning("MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason) - self ! UnregisterMessageConsumerListener(listener) + consumer ! UnregisterMessageConsumerListener(listener) } } }) @@ -422,8 +422,6 @@ object AMQP extends Actor { } trait FaultTolerantConnectionActor extends Actor { - lifeCycle = Some(LifeCycle(Permanent)) - val reconnectionTimer = new Timer var connection: Connection = _ diff --git a/akka-util/src/main/scala/Config.scala b/akka-util/src/main/scala/Config.scala index 0359a3a58a..e3e82c7af9 100644 --- a/akka-util/src/main/scala/Config.scala +++ b/akka-util/src/main/scala/Config.scala @@ -35,7 +35,9 @@ object Config extends Logging { Configgy.configure(configFile) log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile) } catch { - case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.") + case e: ParseException => throw new IllegalStateException( + "'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] aborting." + + "\n\tEither add it in the 'config' directory or add it to the classpath.") } } else if (System.getProperty("akka.config", "") != "") { val configFile = System.getProperty("akka.config", "") @@ -43,21 +45,26 @@ object Config extends Logging { Configgy.configure(configFile) log.info("Config loaded from -Dakka.config=%s", configFile) } catch { - case e: ParseException => throw new IllegalStateException("Config could not be loaded from -Dakka.config=" + configFile) + case e: ParseException => throw new IllegalStateException( + "Config could not be loaded from -Dakka.config=" + configFile) } } else { try { Configgy.configureFromResource("akka.conf", getClass.getClassLoader) log.info("Config loaded from the application classpath.") } catch { - case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.") + case e: ParseException => throw new IllegalStateException( + "'$AKKA_HOME/config/akka.conf' could not be found" + + "\n\tand no 'akka.conf' can be found on the classpath - aborting." + + "\n\tEither add it in the '$AKKA_HOME/config' directory or add it to the classpath.") } } Configgy.config } val CONFIG_VERSION = config.getString("akka.version", "0") - if (VERSION != CONFIG_VERSION) throw new IllegalStateException("Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]") + if (VERSION != CONFIG_VERSION) throw new IllegalStateException( + "Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]") val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 diff --git a/changes.xml b/changes.xml index 105309c949..203bd588ac 100644 --- a/changes.xml +++ b/changes.xml @@ -10,6 +10,11 @@ description see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full guide --> + + + Akka Release Notes @@ -33,11 +38,13 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui Monadic API to TransactionalRef (use it in for-comprehension) Lightweight actor syntax 'actor { case _ => .. }' New Scala JSON parser based on sjson + Upgraded to Netty 3.2 and Protobuf 2.2 Monadic API to TransactionalRef (use it in for-comprehension) Smoother web app integration; just add akka.conf to WEB-INF/classes, no need for AKKA_HOME Modularization of distribution into a thin core (actors, remoting and STM) and the rest in submodules JSON serialization for Java objects (using Jackson) JSON serialization for Scala objects (using SJSON) + Added implementation for remote actor reconnect upon failure Protobuf serialization for Java and Scala objects SBinary serialization for Scala objects Protobuf as remote protocol