Rewritten "home" address management and protocol, all test pass except 2

This commit is contained in:
Jonas Bonér 2010-05-16 10:59:06 +02:00
parent dfc45e0a71
commit f7407d3adf
61 changed files with 668 additions and 866 deletions

View file

@ -79,11 +79,12 @@ object AMQP {
*/
class AMQPSupervisor extends Actor with Logging {
import scala.collection.JavaConversions._
import self._
private val connections = new ConcurrentHashMap[ActorRef, ActorRef]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Throwable])
start
def newProducer(
@ -362,7 +363,8 @@ object AMQP {
extends FaultTolerantConnectionActor {
import scala.collection.JavaConversions._
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])

View file

@ -5,7 +5,6 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import com.rabbitmq.client.ConnectionParameters

View file

@ -20,7 +20,7 @@ import se.scalablesolutions.akka.util.Logging
* @author Martin Krasser
*/
trait Producer { this: Actor =>
private val headersToCopyDefault = Set(Message.MessageExchangeId)
/**
@ -128,7 +128,7 @@ trait Producer { this: Actor =>
case msg => {
if ( oneway && !async) produceOnewaySync(msg)
else if ( oneway && async) produceOnewayAsync(msg)
else if (!oneway && !async) reply(produceSync(msg))
else if (!oneway && !async) self.reply(produceSync(msg))
else /*(!oneway && async)*/ produceAsync(msg)
}
}

View file

@ -3,7 +3,7 @@ package se.scalablesolutions.akka.camel.component
import org.apache.camel.RuntimeCamelException
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.camel.support.{Respond, Countdown, Tester, Retain}
import se.scalablesolutions.akka.camel.{Message, CamelContextManager}
@ -20,39 +20,40 @@ class ActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with
feature("Communicate with an actor from a Camel application using actor endpoint URIs") {
import CamelContextManager.template
import Actor._
scenario("one-way communication using actor id") {
val actor = new Tester with Retain with Countdown[Message]
val actor = newActor(() => new Tester with Retain with Countdown[Message])
actor.start
template.sendBody("actor:%s" format actor.id, "Martin")
assert(actor.waitFor)
assert(actor.body === "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
}
scenario("one-way communication using actor uuid") {
val actor = new Tester with Retain with Countdown[Message]
val actor = newActor(() => new Tester with Retain with Countdown[Message])
actor.start
template.sendBody("actor:uuid:%s" format actor.uuid, "Martin")
assert(actor.waitFor)
assert(actor.body === "Martin")
assert(actor.actor.asInstanceOf[Countdown[Message]].waitFor)
assert(actor.actor.asInstanceOf[Retain].body === "Martin")
}
scenario("two-way communication using actor id") {
val actor = new Tester with Respond
val actor = newActor(() => new Tester with Respond)
actor.start
assert(template.requestBody("actor:%s" format actor.id, "Martin") === "Hello Martin")
}
scenario("two-way communication using actor uuid") {
val actor = new Tester with Respond
val actor = newActor(() => new Tester with Respond)
actor.start
assert(template.requestBody("actor:uuid:%s" format actor.uuid, "Martin") === "Hello Martin")
}
scenario("two-way communication with timeout") {
val actor = new Tester {
timeout = 1
}
val actor = newActor(() => new Tester {
self.timeout = 1
})
actor.start
intercept[RuntimeCamelException] {
template.requestBody("actor:uuid:%s" format actor.uuid, "Martin")

View file

@ -61,7 +61,7 @@ class ActorProducerTest extends JUnitSuite with BeforeAndAfterAll {
@Test def shouldSendMessageToActorAndTimeout: Unit = {
val actor = newActor(() => new Tester {
timeout = 1
self.timeout = 1
})
val endpoint = mockEndpoint("actor:uuid:%s" format actor.uuid)
val exchange = endpoint.createExchange(ExchangePattern.InOut)

View file

@ -12,14 +12,14 @@ object CamelServiceFeatureTest {
class TestConsumer(uri: String) extends Actor with Consumer {
def endpointUri = uri
protected def receive = {
case msg: Message => reply("received %s" format msg.body)
case msg: Message => self.reply("received %s" format msg.body)
}
}
class TestActor extends Actor {
id = "custom-actor-id"
self.id = "custom-actor-id"
protected def receive = {
case msg: Message => reply("received %s" format msg.body)
case msg: Message => self.reply("received %s" format msg.body)
}
}

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.camel.Consumer
object PublishTest {
@consume("mock:test1")
class ConsumeAnnotatedActor extends Actor {
id = "test"
self.id = "test"
protected def receive = null
}

View file

@ -9,10 +9,10 @@ trait Receive[T] {
def onMessage(msg: T): Unit
}
trait Respond extends Receive[Message] {self: Actor =>
trait Respond extends Receive[Message] { this: Actor =>
abstract override def onMessage(msg: Message): Unit = {
super.onMessage(msg)
reply(response(msg))
this.self.reply(response(msg))
}
def response(msg: Message): Any = "Hello %s" format msg.body
}

View file

@ -284,7 +284,7 @@ object ActiveObject {
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy)
actorRef.actor.timeout = timeout
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
actorRef.start
@ -294,7 +294,7 @@ object ActiveObject {
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target)
actorRef.actor.timeout = timeout
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
actorRef.start
@ -544,8 +544,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired
id = targetClass.getName
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
val methods = targetInstance.getClass.getDeclaredMethods.toList
@ -592,10 +592,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
case Invocation(joinPoint, isOneWay, _) =>
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
case Link(target) => link(target)
case Unlink(target) => unlink(target)
else self.reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
case Link(target) => self.link(target)
case Unlink(target) => self.unlink(target)
case unexpected =>
throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}

View file

@ -50,7 +50,7 @@ trait ActorWithNestedReceive extends Actor {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Transactor extends Actor {
makeTransactionRequired
self.makeTransactionRequired
}
/**
@ -61,7 +61,7 @@ trait Transactor extends Actor {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
makeRemote(hostname, port)
self.makeRemote(hostname, port)
}
// Life-cycle messages for the Actors
@ -85,18 +85,10 @@ class ActorInitializationException private[akka](message: String) extends Runtim
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
// FIXME remove next release
object Sender {
@deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
object Self
}
/**
* Creates a Actor.newActor out of the Actor with type T.
* <pre>
@ -143,7 +135,7 @@ object Actor extends Logging {
*/
def actor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
def receive: PartialFunction[Any, Unit] = body
}).start
@ -165,7 +157,7 @@ object Actor extends Logging {
*/
def transactor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Transactor() {
lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
def receive: PartialFunction[Any, Unit] = body
}).start
@ -185,7 +177,7 @@ object Actor extends Logging {
*/
def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef =
newActor(() => new Actor() {
lifeCycle = Some(LifeCycle(Temporary))
self.lifeCycle = Some(LifeCycle(Temporary))
def receive = body
}).start
@ -210,7 +202,7 @@ object Actor extends Logging {
def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) =
newActor(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
body
def receive = handler
}).start
@ -238,7 +230,7 @@ object Actor extends Logging {
newActor(() => new Actor() {
self ! Spawn
def receive = {
case Spawn => body; stop
case Spawn => body; self.stop
}
}).start
}
@ -258,12 +250,12 @@ object Actor extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends Logging {
/**
* For internal use only, functions as the implicit sender references when invoking
* one of the message send functions (!, !!, !!! and forward).
*/
protected implicit val _selfSenderRef: Some[ActorRef] = {
implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
@ -273,9 +265,11 @@ trait Actor extends Logging {
"\n\tEither use:" +
"\n\t\t'val actor = Actor.newActor[MyActor]', or" +
"\n\t\t'val actor = Actor.newActor(() => new MyActor(..))'")
else ref.asInstanceOf[Some[ActorRef]]
else ref
}
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>
@ -284,29 +278,32 @@ trait Actor extends Logging {
* self ! message
* </pre>
*/
val self: ActorRef = _selfSenderRef.get
val self: ActorRef = optionSelf.get
self.id = getClass.getName
/**
* The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
* This means that all actors will share the same event-driven executor based dispatcher.
* User overridable callback/setting.
* <p/>
* You can override it so it fits the specific use-case that the actor is used for.
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
* dispatchers available.
* Partial function implementing the actor logic.
* To be implemented by subclassing actor.
* <p/>
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
* Example code:
* <pre>
* def receive = {
* case Ping =&gt;
* println("got a ping")
* self.reply("pong")
*
* case OneWay =&gt;
* println("got a oneway")
*
* case _ =&gt;
* println("unknown message, ignoring")
* }
* </pre>
*/
private[akka] var _messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
/**
* Holds the hot swapped partial function.
*/
private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
// ========================================
// ==== CALLBACKS FOR USER TO OVERRIDE ====
// ========================================
protected def receive: PartialFunction[Any, Unit]
/**
* User overridable callback/setting.
@ -348,282 +345,14 @@ trait Actor extends Logging {
*/
def shutdown {}
/**
* User overridable callback/setting.
* <p/>
* Partial function implementing the actor logic.
* To be implemented by subclassing actor.
* <p/>
* Example code:
* <pre>
* def receive = {
* case Ping =&gt;
* println("got a ping")
* reply("pong")
*
* case OneWay =&gt;
* println("got a oneway")
*
* case _ =&gt;
* println("unknown message, ignoring")
* }
* </pre>
*/
protected def receive: PartialFunction[Any, Unit]
// ==================
// ==== USER API ====
// ==================
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
* Works with '!', '!!' and '!!!'.
*/
def forward(message: Any)(implicit sender: Some[ActorRef]) = self.forward(message)(sender)
/**
* User overridable callback/setting.
* <p/>
* Identifier for actor, does not have to be a unique one.
* Default is the class name.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote actor in RemoteServer etc.
* But also as the identifier for persistence, which means that you can
* use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
protected[akka] var id: String = this.getClass.getName
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
@volatile var timeout: Long = Actor.TIMEOUT
/**
* User overridable callback/setting.
* <p/>
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
* <pre>
* // trap no exceptions
* trapExit = Nil
*
* // trap all exceptions
* trapExit = List(classOf[Throwable])
*
* // trap specific exceptions only
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
* <p/>
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* Can be one of:
* <pre/>
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
*
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None
/**
* User overridable callback/setting.
* <p/>
* Defines the life-cycle for a supervised actor.
*/
@volatile var lifeCycle: Option[LifeCycle] = None
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to
*/
protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
protected[this] def reply_?(message: Any): Boolean = self.replyTo match {
case Some(Left(actor)) =>
actor ! message
true
case Some(Right(future: Future[Any])) =>
future completeWithResult message
true
case _ =>
false
}
/**
* Starts the actor.
*/
def start = self.startOnCreation = true
/**
* Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/
def exit = self.stop
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = self.stop
/**
* Returns the uuid for the actor.
*/
def uuid = self.uuid
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
* <p/>
* The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
* This means that all actors will share the same event-driven executor based dispatcher.
* <p/>
* You can override it so it fits the specific use-case that the actor is used for.
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
* dispatchers available.
* <p/>
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
def dispatcher_=(md: MessageDispatcher): Unit =
if (!self.isRunning) _messageDispatcher = md
else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
/**
* Get the dispatcher for this actor.
*/
def dispatcher: MessageDispatcher = _messageDispatcher
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = self.makeTransactionRequired
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit = makeRemote(new InetSocketAddress(hostname, port))
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = self.makeRemote(address)
/**
* Set the contact address for this actor. This is used for replying to messages sent
* asynchronously when no reply channel exists.
*/
def setReplyToAddress(hostname: String, port: Int): Unit = self.setReplyToAddress(new InetSocketAddress(hostname, port))
/**
* Set the contact address for this actor. This is used for replying to messages sent
* asynchronously when no reply channel exists.
*/
def setReplyToAddress(address: InetSocketAddress): Unit = self.setReplyToAddress(address)
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def link(actorRef: ActorRef) = self.link(actorRef)
/**
* Unlink the actor.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def unlink(actorRef: ActorRef) = self.unlink(actorRef)
/**
* Atomically start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def startLink(actorRef: ActorRef) = self.startLink(actorRef)
/**
* Atomically start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) =
self.startLinkRemote(actorRef, hostname, port)
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawn[T <: Actor : Manifest]: ActorRef = self.spawn[T]
/**
* Atomically create (from actor class), start and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
self.spawnRemote[T](hostname, port)
/**
* Atomically create (from actor class), start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = self.spawnLink[T]
/**
* User overridable callback/setting.
* <p/>
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
* start if there is no one running, else it joins the existing transaction.
*/
protected[this] def isTransactor_=(flag: Boolean) = self.isTransactor = flag
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (self.hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case HotSwap(code) => _hotswap = code
case HotSwap(code) => self.hotswap = code
case Restart(reason) => self.restart(reason)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Unlink(child) => self.unlink(child)
@ -631,15 +360,11 @@ trait Actor extends Logging {
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
override def hashCode(): Int = HashCode.hash(HashCode.SEED, uuid)
override def hashCode: Int = self.hashCode
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Actor] &&
that.asInstanceOf[Actor].uuid == uuid
}
override def equals(that: Any): Boolean = self.equals(that)
override def toString = "Actor[" + id + ":" + uuid + "]"
override def toString = self.toString
}
/**

View file

@ -14,7 +14,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReadWriteLock}
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
@ -22,7 +22,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
import jsr166x.{Deque, ConcurrentLinkedDeque}
import java.net.InetSocketAddress
import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
@ -115,10 +115,89 @@ trait ActorRef extends TransactionManagement {
@volatile protected[this] var _isSuspended = true
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isKilled = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReadWriteLock
protected[this] val guard = new ReentrantGuard
/**
* User overridable callback/setting.
* <p/>
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
@volatile var id: String = _uuid
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
@volatile var timeout: Long = Actor.TIMEOUT
/**
* User overridable callback/setting.
* <p/>
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
* <pre>
* // trap no exceptions
* trapExit = Nil
*
* // trap all exceptions
* trapExit = List(classOf[Throwable])
*
* // trap specific exceptions only
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
@volatile var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
* <p/>
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* Can be one of:
* <pre/>
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
*
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
@volatile var faultHandler: Option[FaultHandlingStrategy] = None
/**
* User overridable callback/setting.
* <p/>
* Defines the life-cycle for a supervised actor.
*/
@volatile var lifeCycle: Option[LifeCycle] = None
/**
* The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
* This means that all actors will share the same event-driven executor based dispatcher.
* <p/>
* You can override it so it fits the specific use-case that the actor is used for.
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
* dispatchers available.
* <p/>
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
/**
* Holds the hot swapped partial function.
*/
protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
/**
* User overridable callback/setting.
@ -132,7 +211,7 @@ trait ActorRef extends TransactionManagement {
* This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor.
*/
protected[akka] val dispatcherLock: Lock = new ReentrantLock
protected[akka] val dispatcherLock = new ReentrantLock
/**
* Holds the reference to the sender of the currently processed message.
@ -141,10 +220,8 @@ trait ActorRef extends TransactionManagement {
* - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/
protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] =
guard.withReadLock { _replyTo }
protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) =
guard.withWriteLock { _replyTo = rt }
protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo }
protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt }
/**
* Is the actor killed?
@ -165,6 +242,10 @@ trait ActorRef extends TransactionManagement {
* Returns the uuid for the actor.
*/
def uuid = _uuid
/**
* Only for internal use. UUID is effectively final.
*/
protected[akka] def uuid_=(uid: String) = _uuid = uid
/**
@ -196,7 +277,7 @@ trait ActorRef extends TransactionManagement {
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !![T](message: Any, timeout: Long): Option[T] = {
@ -204,8 +285,9 @@ trait ActorRef extends TransactionManagement {
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid)
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
}
try {
future.await
} catch {
@ -232,7 +314,7 @@ trait ActorRef extends TransactionManagement {
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout)
@ -243,7 +325,7 @@ trait ActorRef extends TransactionManagement {
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>reply(..)</code>
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!![T](message: Any): Future[T] = {
@ -269,6 +351,39 @@ trait ActorRef extends TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def reply_?(message: Any): Boolean = replyTo match {
case Some(Left(actor)) =>
actor ! message
true
case Some(Right(future: Future[Any])) =>
future completeWithResult message
true
case _ =>
false
}
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
@ -310,22 +425,20 @@ trait ActorRef extends TransactionManagement {
def makeTransactionRequired: Unit
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
* Returns the home address and port for this actor.
*/
def setReplyToAddress(hostname: String, port: Int): Unit =
setReplyToAddress(new InetSocketAddress(hostname, port))
def homeAddress: InetSocketAddress = _homeAddress
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
* Set the home address and port for this actor.
*/
def setReplyToAddress(address: InetSocketAddress): Unit
def homeAddress_=(hostnameAndPort: Tuple2[String, Int]): Unit =
homeAddress_=(new InetSocketAddress(hostnameAndPort._1, hostnameAndPort._2))
/**
* Returns the id for the actor.
* Set the home address and port for this actor.
*/
def id: String
def homeAddress_=(address: InetSocketAddress): Unit
/**
* Returns the remote address for the actor, if any, else None.
@ -333,20 +446,6 @@ trait ActorRef extends TransactionManagement {
def remoteAddress: Option[InetSocketAddress]
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout: Long
/**
* Sets the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout_=(t: Long)
/**
* Starts up the actor and its message queue.
*/
@ -456,15 +555,6 @@ trait ActorRef extends TransactionManagement {
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
protected[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit
protected[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
protected[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle
protected[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle
protected[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler
protected[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
protected[akka] def mailbox: Deque[MessageInvocation]
protected[akka] def restart(reason: Throwable): Unit
@ -479,20 +569,38 @@ trait ActorRef extends TransactionManagement {
protected[akka] def linkedActorsAsList: List[ActorRef]
override def toString: String
override def hashCode: Int
override def equals(that: Any): Boolean
override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[ActorRef] &&
that.asInstanceOf[ActorRef].uuid == uuid
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
senderOption.foreach { sender =>
val address = sender.homeAddress
val server = RemoteServer.serverFor(address) match {
case Some(server) => server
case None => (new RemoteServer).start(address)
}
server.register(sender.uuid, sender)
requestBuilder.setSender(sender.toProtocol)
}
}
}
/**
* Local ActorRef that is used when referencing the Actor on its "home" node.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
@ -500,20 +608,21 @@ sealed class LocalActorRef private[akka](
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None
@volatile private[akka] var _supervisor: Option[ActorRef] = None
@volatile private[akka] var _replyToAddress: Option[InetSocketAddress] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = new AtomicReference[Actor](newActor)
if (startOnCreation) start
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
if (runActorInitialization) initializeActorInstance
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
protected[akka] def toProtocol: ActorRefProtocol = guard.withWriteLock {
val (host, port) = _replyToAddress.map(address =>
(address.getHostName, address.getPort))
.getOrElse((Actor.HOSTNAME, Actor.PORT))
protected[akka] def toProtocol: ActorRefProtocol = guard.withGuard {
val host = homeAddress.getHostName
val port = homeAddress.getPort
if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
@ -549,8 +658,8 @@ sealed class LocalActorRef private[akka](
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = guard.withWriteLock {
if (!isRunning) actor.dispatcher = md
def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
if (!isRunning) _dispatcher = md
else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@ -558,7 +667,7 @@ sealed class LocalActorRef private[akka](
/**
* Get the dispatcher for this actor.
*/
def dispatcher: MessageDispatcher = guard.withReadLock { actor.dispatcher }
def dispatcher: MessageDispatcher = guard.withGuard { _dispatcher }
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@ -571,13 +680,13 @@ sealed class LocalActorRef private[akka](
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = guard.withWriteLock {
def makeRemote(address: InetSocketAddress): Unit = guard.withGuard {
if (isRunning) throw new IllegalStateException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
_remoteAddress = Some(address)
RemoteClient.register(address.getHostName, address.getPort, uuid)
if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
homeAddress = (RemoteServer.HOSTNAME, RemoteServer.PORT)
}
}
@ -589,7 +698,7 @@ sealed class LocalActorRef private[akka](
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = guard.withWriteLock {
def makeTransactionRequired = guard.withGuard {
if (isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactor = true
@ -599,57 +708,32 @@ sealed class LocalActorRef private[akka](
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
*/
def setReplyToAddress(address: InetSocketAddress): Unit =
guard.withReadLock { _replyToAddress = Some(address) }
/**
* Returns the id for the actor.
*/
def id = actor.id
def homeAddress_=(address: InetSocketAddress): Unit = guard.withGuard { _homeAddress = address }
/**
* Returns the remote address for the actor, if any, else None.
*/
def remoteAddress: Option[InetSocketAddress] = guard.withReadLock { _remoteAddress }
def remoteAddress: Option[InetSocketAddress] = guard.withGuard { _remoteAddress }
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit =
guard.withWriteLock { _remoteAddress = addr }
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout: Long = actor.timeout
/**
* Sets the default timeout for '!!' and '!!!' invocations,
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout_=(t: Long) = actor.timeout = t
guard.withGuard { _remoteAddress = addr }
/**
* Starts up the actor and its message queue.
*/
def start: ActorRef = guard.withWriteLock {
def start: ActorRef = guard.withGuard {
if (isShutdown) throw new IllegalStateException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
_isRunning = true
actor.init
actor.initTransactionalState
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
this
}
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = guard.withWriteLock {
def stop = guard.withGuard {
if (isRunning) {
dispatcher.unregister(this)
_isRunning = false
@ -672,7 +756,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def link(actorRef: ActorRef) = guard.withWriteLock {
def link(actorRef: ActorRef) = guard.withGuard {
if (actorRef.supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
linkedActors.put(actorRef.uuid, actorRef)
@ -685,7 +769,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def unlink(actorRef: ActorRef) = guard.withWriteLock {
def unlink(actorRef: ActorRef) = guard.withGuard {
if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalStateException(
"Actor [" + actorRef + "] is not a linked actor, can't unlink")
linkedActors.remove(actorRef.uuid)
@ -698,7 +782,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef) = guard.withWriteLock {
def startLink(actorRef: ActorRef) = guard.withGuard {
try {
actorRef.start
} finally {
@ -711,7 +795,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withWriteLock {
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withGuard {
try {
actorRef.makeRemote(hostname, port)
actorRef.start
@ -725,7 +809,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def spawn[T <: Actor : Manifest]: ActorRef = guard.withWriteLock {
def spawn[T <: Actor : Manifest]: ActorRef = guard.withGuard {
val actorRef = spawnButDoNotStart[T]
actorRef.start
actorRef
@ -736,7 +820,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port)
actor.start
@ -748,7 +832,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
def spawnLink[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
try {
actor.start
@ -763,7 +847,7 @@ sealed class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withWriteLock {
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = guard.withGuard {
val actor = spawnButDoNotStart[T]
try {
actor.makeRemote(hostname, port)
@ -781,25 +865,19 @@ sealed class LocalActorRef private[akka](
/**
* Shuts down and removes all linked actors.
*/
def shutdownLinkedActors: Unit = guard.withWriteLock {
def shutdownLinkedActors: Unit = guard.withGuard {
linkedActorsAsList.foreach(_.stop)
linkedActors.clear
}
override def toString: String = actor.toString
override def hashCode: Int = actor.hashCode
override def equals(that: Any): Boolean = actor.equals(that)
/**
* Returns the supervisor, if there is one.
*/
def supervisor: Option[ActorRef] = guard.withReadLock { _supervisor }
def supervisor: Option[ActorRef] = guard.withGuard { _supervisor }
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withWriteLock { _supervisor = sup }
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup }
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withWriteLock {
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
val actorRef = Actor.newActor(() => actor)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
@ -809,6 +887,7 @@ sealed class LocalActorRef private[akka](
}
private[this] def newActor: Actor = {
isInInitialization = true
Actor.actorRefInCreation.value = Some(this)
val actor = actorFactory match {
case Left(Some(clazz)) =>
@ -828,6 +907,7 @@ sealed class LocalActorRef private[akka](
}
if (actor eq null) throw new ActorInitializationException(
"Actor instance passed to ActorRef can not be 'null'")
isInInitialization = false
actor
}
@ -846,8 +926,7 @@ sealed class LocalActorRef private[akka](
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None)
@ -897,26 +976,6 @@ sealed class LocalActorRef private[akka](
}
}
protected[akka] def restart(reason: Throwable): Unit = {
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
val failedActor = actorInstance.get
failedActor.synchronized {
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
stop
val freshActor = newActor
freshActor.synchronized {
actorInstance.set(freshActor)
start
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
_isKilled = false
}
}
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
// FIXME test to run bench without this trace call
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
@ -938,11 +997,11 @@ sealed class LocalActorRef private[akka](
}
}
private def dispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
private def dispatch[T](messageHandle: MessageInvocation) = {
setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
replyTo = messageHandle.replyTo
_replyTo = messageHandle.replyTo
try {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
@ -963,7 +1022,7 @@ sealed class LocalActorRef private[akka](
}
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = guard.withWriteLock {
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
@ -979,7 +1038,7 @@ sealed class LocalActorRef private[akka](
setTransactionSet(txSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
replyTo = messageHandle.replyTo
_replyTo = messageHandle.replyTo
def proceed = {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
@ -1020,21 +1079,49 @@ sealed class LocalActorRef private[akka](
}
}
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = guard.withReadLock {
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) {
faultHandler.get match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) =>
restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) =>
dead.restart(reason)
}
} else throw new IllegalStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
} else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
} else {
_supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
}
}
protected[akka] def restartLinkedActors(reason: Throwable) = guard.withWriteLock {
protected[akka] def restart(reason: Throwable): Unit = {
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
restartLinkedActors(reason)
val failedActor = actorInstance.get
failedActor.synchronized {
Actor.log.debug("Restarting linked actors for actor [%s].", id)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
failedActor.shutdown
_isRunning = false
val freshActor = newActor
freshActor.synchronized {
freshActor.init
freshActor.initTransactionalState
_isRunning = true
actorInstance.set(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
_isKilled = false
}
}
protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard {
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
@ -1060,14 +1147,14 @@ sealed class LocalActorRef private[akka](
}
}
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withWriteLock {
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard {
if (_supervisor.isDefined) {
RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
}
protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withWriteLock {
protected[akka] def linkedActors: JMap[String, ActorRef] = guard.withGuard {
if (_linkedActors.isEmpty) {
val actors = new ConcurrentHashMap[String, ActorRef]
_linkedActors = Some(actors)
@ -1078,6 +1165,17 @@ sealed class LocalActorRef private[akka](
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
private def initializeActorInstance = if (!isRunning) {
dispatcher.register(this)
dispatcher.start
actor.init // run actor init and initTransactionalState callbacks
actor.initTransactionalState
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
_isRunning = true
}
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
@ -1105,8 +1203,9 @@ sealed class LocalActorRef private[akka](
}
/**
* Remote Actor proxy.
*
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
* This reference is network-aware (remembers its origin) and immutable.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
@ -1114,8 +1213,10 @@ private[akka] case class RemoteActorRef private[akka] (
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long)
extends ActorRef {
_uuid = uuuid
timeout = _timeout
start
Thread.sleep(1000)
val remoteClient = RemoteClient.clientFor(hostname, port)
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
@ -1127,7 +1228,7 @@ private[akka] case class RemoteActorRef private[akka] (
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send[Any](requestBuilder.build, None)
}
@ -1144,15 +1245,12 @@ private[akka] case class RemoteActorRef private[akka] (
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
def timeout: Long = _timeout
def start: ActorRef = {
_isRunning = true
this
@ -1168,13 +1266,11 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired: Unit = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def makeTransactionRequired: Unit = unsupported
def setReplyToAddress(address: InetSocketAddress): Unit = unsupported
def id: String = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
def remoteAddress: Option[InetSocketAddress] = unsupported
def timeout_=(t: Long) = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
@ -1198,5 +1294,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[this] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}

View file

@ -98,39 +98,12 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author Viktor Klang
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
sealed class Agent[T] private (initialValue: T) {
import Agent._
import Actor._
// _selfSenderRef = Some(newActor(() => this).start)
log.debug("Starting up Agent [%s]", uuid)
private lazy val value = Ref[T]()
self ! Value(initialValue)
/**
* Periodically handles incoming messages.
*/
def receive = {
case Value(v: T) =>
swap(v)
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
}
/**
* Specifies how a copy of the value is made, defaults to using identity.
*/
protected def copyStrategy(t: T): T = t
/**
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
private final def swap(newData: => T): Unit = value.swap(newData)
private val dispatcher = newActor(() => new AgentDispatcher[T](initialValue)).start
dispatcher ! Value(initialValue)
/**
* Submits a request to read the internal state.
@ -140,8 +113,9 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* method and then waits for its result on a CountDownLatch.
*/
final def get: T = {
if (self.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
if (dispatcher.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction."+
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
sendProc((v: T) => {ref.set(v); latch.countDown})
@ -159,22 +133,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
/**
* Submits the provided function for execution against the internal agent's state.
*/
final def apply(message: (T => T)): Unit = self ! Function(message)
final def apply(message: (T => T)): Unit = dispatcher ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
final def apply(message: T): Unit = self ! Value(message)
final def apply(message: T): Unit = dispatcher ! Value(message)
/**
* Submits the provided function of type 'T => T' for execution against the internal agent's state.
*/
final def send(message: (T) => T): Unit = self ! Function(message)
final def send(message: (T) => T): Unit = dispatcher ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
final def send(message: T): Unit = self ! Value(message)
final def send(message: T): Unit = dispatcher ! Value(message)
/**
* Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
@ -182,7 +156,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* of the internal state will be used, depending on the underlying effective copyStrategy.
* Does not change the value of the agent (this).
*/
final def sendProc(f: (T) => Unit): Unit = self ! Procedure(f)
final def sendProc(f: (T) => Unit): Unit = dispatcher ! Procedure(f)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
@ -207,7 +181,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
*
* A closed agent can never be used again.
*/
def close = stop
def close = dispatcher.stop
}
/**
@ -221,20 +195,44 @@ object Agent {
/*
* The internal messages for passing around requests.
*/
private case class Value[T](value: T)
private case class Function[T](fun: ((T) => T))
private case class Procedure[T](fun: ((T) => Unit))
private[akka] case class Value[T](value: T)
private[akka] case class Function[T](fun: ((T) => T))
private[akka] case class Procedure[T](fun: ((T) => Unit))
/**
* Creates a new Agent of type T with the initial value of value.
*/
def apply[T](value: T): Agent[T] = new Agent(value)
}
/**
* Agent dispatcher Actor.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
import Agent._
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
private lazy val value = Ref[T]()
/**
* Periodically handles incoming messages.
*/
def receive = {
case Value(v: T) =>
swap(v)
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
proc(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))
}
/**
* Creates a new Agent of type T with the initial value of value and with the
* specified copy function.
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
override def copyStrategy(t: T) = newCopyStrategy(t)
}
private final def swap(newData: => T): Unit = value.swap(newData)
}

View file

@ -27,7 +27,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>
@ -42,12 +42,12 @@ object Scheduler extends Actor {
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Throwable])
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
startLink(newActor(() => new ScheduleActor(
self.startLink(newActor(() => new ScheduleActor(
receiver,
service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message;
@ -60,7 +60,7 @@ object Scheduler extends Actor {
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
def stopSupervising(actorRef: ActorRef) = {
unlink(actorRef)
self.unlink(actorRef)
schedulers.remove(actorRef)
}

View file

@ -164,7 +164,8 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
sealed class Supervisor private[akka] (
handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Configurator {
private val children = new ConcurrentHashMap[String, List[ActorRef]]
@ -202,7 +203,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
else list
}
children.put(className, actorRef :: currentActors)
actorRef.actor.lifeCycle = Some(lifeCycle)
actorRef.lifeCycle = Some(lifeCycle)
supervisor ! Link(actorRef)
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
@ -241,16 +242,16 @@ final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]])
extends Actor {
trapExit = trapExceptions
faultHandler = Some(handler)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
self.trapExit = trapExceptions
self.faultHandler = Some(handler)
override def shutdown: Unit = self.shutdownLinkedActors
def receive = {
case Link(child) => startLink(child)
case Unlink(child) => unlink(child)
case UnlinkAndStop(child) => unlink(child); child.stop
case Link(child) => self.startLink(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case unknown => throw new IllegalArgumentException(
"Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]")
}

View file

@ -80,5 +80,5 @@ object Dispatchers {
* <p/>
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
}

View file

@ -14,11 +14,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker)
extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(Actor.newActor(() => actor)))
class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "thread-based:dispatcher:" + name
private val messageHandler = new ActorMessageInvoker(actor)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -27,7 +26,7 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def start = if (!active) {
active = true
selectorThread = new Thread {
selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {

View file

@ -14,7 +14,7 @@ import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {
val name: String
private val NR_START_THREADS = 4
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private val MILLISECONDS = TimeUnit.MILLISECONDS

View file

@ -182,6 +182,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getTimeout)
println("------ SETTING ID: " + request.getId + " " + name)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@ -263,6 +264,7 @@ class RemoteClientHandler(val name: String,
if (result.isInstanceOf[RemoteReplyProtocol]) {
val reply = result.asInstanceOf[RemoteReplyProtocol]
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
println("------ GETTING ID: " + reply.getId + " " + name)
val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply)

View file

@ -112,6 +112,9 @@ object RemoteServer {
else Some(server)
}
private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
serverFor(address.getHostName, address.getPort)
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
remoteServers.put(Address(hostname, port), server)
@ -154,13 +157,22 @@ class RemoteServer extends Logging {
def isRunning = _isRunning
def start: Unit = start(None)
def start: RemoteServer =
start(hostname, port, None)
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
def start(loader: Option[ClassLoader]): RemoteServer =
start(hostname, port, loader)
def start(_hostname: String, _port: Int): Unit = start(_hostname, _port, None)
def start(address: InetSocketAddress): RemoteServer =
start(address.getHostName, address.getPort, None)
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): Unit = synchronized {
def start(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteServer =
start(address.getHostName, address.getPort, loader)
def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None)
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
try {
if (!_isRunning) {
hostname = _hostname
@ -182,6 +194,7 @@ class RemoteServer extends Logging {
} catch {
case e => log.error(e, "Could not start up remote server")
}
this
}
def shutdown = synchronized {
@ -334,7 +347,7 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
if (request.hasSender) {
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender)))
} else {

View file

@ -46,7 +46,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
extends Actor {
def receive = {
case Exit => exit
case message => reply(body(message.asInstanceOf[A]))
case message => self.reply(body(message.asInstanceOf[A]))
}
}
@ -64,7 +64,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
self.timeout = TIME_OUT
def receive = {
case Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
@ -78,13 +78,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
}
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
self.timeout = TIME_OUT
private var readerFuture: Option[CompletableFuture[T]] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
if (ref.isDefined)
reply(ref.get)
self.reply(ref.get)
else {
readerFuture = self.replyTo match {
case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
@ -348,7 +348,6 @@ object Test4 extends Application {
// =======================================
object Test5 extends Application {
import Actor.Sender.Self
import DataFlow._
// create four 'Int' data flow variables

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.util.concurrent.locks.ReentrantReadWriteLock
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}

View file

@ -9,16 +9,18 @@ import Actor._
object ActorFireForgetRequestReplySpec {
class ReplyActor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "Send" => reply("Reply")
case "SendImplicit" => self.replyTo.get.left.get ! "ReplyImplicit"
case "Send" =>
self.reply("Reply")
case "SendImplicit" =>
self.replyTo.get.left.get ! "ReplyImplicit"
}
}
class SenderActor(replyActor: ActorRef) extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "Init" => replyActor ! "Send"
@ -42,12 +44,11 @@ object ActorFireForgetRequestReplySpec {
class ActorFireForgetRequestReplySpec extends JUnitSuite {
import ActorFireForgetRequestReplySpec._
@Test
def shouldReplyToBangMessageUsingReply = {
state.finished.reset
val replyActor = newActor[ReplyActor]
replyActor.start
val replyActor = newActor[ReplyActor].start
val senderActor = newActor(() => new SenderActor(replyActor))
senderActor.start
senderActor ! "Init"
@ -59,10 +60,8 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
@Test
def shouldReplyToBangMessageUsingImplicitSender = {
state.finished.reset
val replyActor = newActor[ReplyActor]
replyActor.start
val senderActor = newActor(() => new SenderActor(replyActor))
senderActor.start
val replyActor = newActor[ReplyActor].start
val senderActor = newActor(() => new SenderActor(replyActor)).start
senderActor ! "InitImplicit"
try { state.finished.await(1L, TimeUnit.SECONDS) }
catch { case e: TimeoutException => fail("Never got the message") }

View file

@ -7,11 +7,11 @@ import Actor._
object ActorRegistrySpec {
var record = ""
class TestActor extends Actor {
id = "MyID"
self.id = "MyID"
def receive = {
case "ping" =>
record = "pong" + record
reply("got ping")
self.reply("got ping")
}
}
}
@ -26,7 +26,7 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
actor.stop
}
@ -48,7 +48,7 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
actor.stop
}
@ -59,7 +59,7 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 1)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
actor.stop
}
@ -72,9 +72,9 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.id === "MyID")
actor1.stop
actor2.stop
}
@ -88,9 +88,9 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.id === "MyID")
actor1.stop
actor2.stop
}
@ -104,9 +104,9 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.id === "MyID")
actor1.stop
actor2.stop
}
@ -120,9 +120,9 @@ class ActorRegistrySpec extends JUnitSuite {
val actors = ActorRegistry.actors
assert(actors.size === 2)
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.head.id === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].id === "MyID")
assert(actors.last.id === "MyID")
actor1.stop
actor2.stop
}

View file

@ -23,7 +23,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
agent send (_ * 2)
val result = agent()
result must be(12)
agent.stop
agent.close
}
@Test def testSendValue = {
@ -31,7 +31,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
agent send 6
val result = agent()
result must be(6)
agent.stop
agent.close
}
@Test def testSendProc = {
@ -42,7 +42,7 @@ class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
agent sendProc { e => result += e; latch.countDown }
assert(latch.await(5, TimeUnit.SECONDS))
result must be(10)
agent.stop
agent.close
}
@Test def testOneAgentsendWithinEnlosingTransactionSuccess = {

View file

@ -16,7 +16,7 @@ object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" =>
@ -27,7 +27,7 @@ class RemoteActorSpecActorUnidirectional extends Actor {
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -36,7 +36,7 @@ class RemoteActorSpecActorBidirectional extends Actor {
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
}
}
@ -64,15 +64,11 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
val PORT1 = 9990
val PORT2 = 9991
var s1: RemoteServer = null
var s2: RemoteServer = null
@Before
def init() {
s1 = new RemoteServer()
s2 = new RemoteServer()
s1.start(HOSTNAME, PORT1)
s2.start(HOSTNAME, PORT2)
Thread.sleep(1000)
}
@ -83,7 +79,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
@After
def finished() {
s1.shutdown
s2.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
}
@ -104,7 +99,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val sender = newActor[SendOneWayAndReplySenderActor]
sender.setReplyToAddress(HOSTNAME, PORT2)
sender.homeAddress = (HOSTNAME, PORT2)
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor
sender.start
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff

View file

@ -8,10 +8,10 @@ import Actor._
object ExecutorBasedEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -21,7 +21,7 @@ object ExecutorBasedEventDrivenDispatcherActorSpec {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}

View file

@ -14,8 +14,8 @@ import Actor._
*/
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "SlowActor"
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
self.id = "SlowActor"
def receive = {
case x: Int => {
@ -26,8 +26,8 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
}
class FastActor(finishedCounter: CountDownLatch) extends Actor {
dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "FastActor"
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
self.id = "FastActor"
def receive = {
case x: Int => {

View file

@ -16,9 +16,9 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
dispatcher = delayableActorDispatcher
self.dispatcher = delayableActorDispatcher
var invocationCount = 0
id = name
self.id = name
def receive = {
case x: Int => {
@ -30,17 +30,17 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
}
class FirstActor extends Actor {
dispatcher = sharedActorDispatcher
self.dispatcher = sharedActorDispatcher
def receive = {case _ => {}}
}
class SecondActor extends Actor {
dispatcher = sharedActorDispatcher
self.dispatcher = sharedActorDispatcher
def receive = {case _ => {}}
}
class ParentActor extends Actor {
dispatcher = parentActorDispatcher
self.dispatcher = parentActorDispatcher
def receive = {case _ => {}}
}

View file

@ -18,7 +18,7 @@ object ForwardActorSpec {
ForwardState.sender = Some(self.replyTo.get.left.get)
latch.countDown
}
case "SendBangBang" => reply("SendBangBang")
case "SendBangBang" => self.reply("SendBangBang")
}
}

View file

@ -9,7 +9,7 @@ object FutureSpec {
class TestActor extends Actor {
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "NoReply" => {}
case "Failure" =>
throw new RuntimeException("expected")

View file

@ -26,11 +26,10 @@ case class FailureOneWay(key: String, value: String, failer: ActorRef)
case object GetNotifier
class InMemStatefulActor(expectedInvocationCount: Int) extends Actor {
class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor {
def this() = this(0)
timeout = 5000
makeTransactionRequired
self.timeout = 5000
val notifier = new CountDownLatch(expectedInvocationCount)
private lazy val mapState = TransactionalState.newMap[String, String]
@ -39,40 +38,40 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Actor {
def receive = {
case GetNotifier =>
reply(notifier)
self.reply(notifier)
case GetMapState(key) =>
reply(mapState.get(key).get)
self.reply(mapState.get(key).get)
notifier.countDown
case GetVectorSize =>
reply(vectorState.length.asInstanceOf[AnyRef])
self.reply(vectorState.length.asInstanceOf[AnyRef])
notifier.countDown
case GetRefState =>
reply(refState.get.get)
self.reply(refState.get.get)
notifier.countDown
case SetMapState(key, msg) =>
mapState.put(key, msg)
reply(msg)
self.reply(msg)
notifier.countDown
case SetVectorState(msg) =>
vectorState.add(msg)
reply(msg)
self.reply(msg)
notifier.countDown
case SetRefState(msg) =>
refState.swap(msg)
reply(msg)
self.reply(msg)
notifier.countDown
case Success(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
reply(msg)
self.reply(msg)
notifier.countDown
case Failure(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer !! "Failure"
reply(msg)
self.reply(msg)
notifier.countDown
case SetMapStateOneWay(key, msg) =>
@ -99,8 +98,8 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Actor {
}
@serializable
class InMemFailerActor extends Actor {
makeTransactionRequired
class InMemFailerActor extends Transactor {
def receive = {
case "Failure" =>
throw new RuntimeException("expected")

View file

@ -30,7 +30,7 @@ object ProtobufActorMessageSerializationSpec {
def receive = {
case pojo: ProtobufPOJO =>
val id = pojo.getId
reply(id + 1)
self.reply(id + 1)
case msg =>
throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg)
}

View file

@ -9,11 +9,11 @@ import se.scalablesolutions.akka.dispatch.Dispatchers
object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid)
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -23,7 +23,7 @@ object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}

View file

@ -9,10 +9,10 @@ import Actor._
object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -27,7 +27,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
@Test def shouldSendOneWay {
val oneWay = new CountDownLatch(1)
val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
def receive = {
case "OneWay" => oneWay.countDown
}

View file

@ -21,11 +21,11 @@ object Log {
}
@serializable class RemotePingPong1Actor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case BinaryString("Ping") =>
Log.messageLog.put("ping")
reply("pong")
self.reply("pong")
case OneWay =>
Log.oneWayLog += "oneway"
@ -40,11 +40,11 @@ object Log {
}
@serializable class RemotePingPong2Actor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case BinaryString("Ping") =>
Log.messageLog.put("ping")
reply("pong")
self.reply("pong")
case BinaryString("Die") =>
throw new RuntimeException("DIE")
}
@ -55,11 +55,11 @@ object Log {
}
@serializable class RemotePingPong3Actor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case BinaryString("Ping") =>
Log.messageLog.put("ping")
reply("pong")
self.reply("pong")
case BinaryString("Die") =>
throw new RuntimeException("DIE")
}

View file

@ -6,12 +6,13 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import scala.reflect.BeanInfo
@BeanInfo
//@BeanInfo
case class Foo(foo: String) {
def this() = this(null)
}
@BeanInfo
//@BeanInfo
case class MyMessage(val id: String, val value: Tuple2[String, Int]) {
private def this() = this(null, null)
}

View file

@ -7,9 +7,10 @@ import se.scalablesolutions.akka.util.Logging
import Actor._
class HelloWorldActor extends Actor {
start
self.start
def receive = {
case "Hello" => reply("World")
case "Hello" => self.reply("World")
}
}

View file

@ -19,7 +19,6 @@ object ServerInitiatedRemoteActorSpec {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
start
def receive = {
case "OneWay" =>
@ -28,10 +27,10 @@ object ServerInitiatedRemoteActorSpec {
}
class RemoteActorSpecActorBidirectional extends Actor {
start
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -41,17 +40,13 @@ object ServerInitiatedRemoteActorSpec {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorAsyncSender extends Actor {
start
def receive = {
case Send(actor: ActorRef) =>
actor ! "Hello"
case "World" =>
RemoteActorSpecActorAsyncSender.latch.countDown
}
def send(actor: ActorRef) {
self ! Send(actor)
}
}
}
@ -62,7 +57,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS
@Before
def init() {
def init {
server = new RemoteServer()
server.start(HOSTNAME, PORT)
@ -76,14 +71,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
// make sure the servers shutdown cleanly after the test has finished
@After
def finished() {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
def finished {
try {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
@Test
def shouldSendOneWay {
def shouldSendWithBang {
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",
5000L,
@ -94,7 +93,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
@Test
def shouldSendReplyAsync {
def shouldSendWithBangBangAndGetReply {
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
5000L,
@ -105,22 +104,22 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
@Test
def shouldSendRemoteReplyProtocol {
def shouldSendWithBangAndGetReplyThroughSenderRef {
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
val sender = newActor[RemoteActorSpecActorAsyncSender]
sender.setReplyToAddress(HOSTNAME, PORT)
sender.homeAddress = (HOSTNAME, PORT + 1)
sender.start
sender.send(actor)
sender ! Send(actor)
assert(RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test
def shouldSendReceiveException {
def shouldSendWithBangBangAndReplyWithException {
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",

View file

@ -4,8 +4,6 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.{OneWay, Die, Ping}
@ -13,16 +11,23 @@ import Actor._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, BlockingQueue, LinkedBlockingQueue}
object SupervisorSpec {
var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
var messageLog = new LinkedBlockingQueue[String]
var oneWayLog = new LinkedBlockingQueue[String]
def clearMessageLogs {
messageLog.clear
oneWayLog.clear
}
class PingPong1Actor extends Actor {
self.timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
reply("pong")
self.reply("pong")
case OneWay =>
oneWayLog.put("oneway")
@ -36,10 +41,11 @@ object SupervisorSpec {
}
class PingPong2Actor extends Actor {
self.timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
reply("pong")
self.reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
@ -49,10 +55,11 @@ object SupervisorSpec {
}
class PingPong3Actor extends Actor {
self.timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
reply("pong")
self.reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
@ -74,31 +81,31 @@ class SupervisorSpec extends JUnitSuite {
var pingpong3: ActorRef = _
@Test def shouldStartServer = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorAllForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
}
@Test def shouldStartServerForNestedSupervisorHierarchy = {
messageLog.clear
clearMessageLogs
val sup = getNestedSupervisorsAllForOneConf
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
}
@Test def shouldKillSingleActorOneForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorOneForOneSupervisor
sup.start
intercept[RuntimeException] {
pingpong1 !! Die
pingpong1 !! (Die, 100)
}
expect("DIE") {
@ -107,25 +114,25 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldCallKillCallSingleActorOneForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorOneForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! Die
pingpong1 !! (Die, 100)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -134,11 +141,11 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldKillSingleActorAllForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorAllForOneSupervisor
sup.start
intercept[RuntimeException] {
pingpong1 !! Die
pingpong1 !! (Die, 100)
}
expect("DIE") {
@ -147,25 +154,25 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldCallKillCallSingleActorAllForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorAllForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! Die
pingpong1 !! (Die, 100)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -174,11 +181,11 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldKillMultipleActorsOneForOne = {
messageLog.clear
clearMessageLogs
val sup = getMultipleActorsOneForOneConf
sup.start
intercept[RuntimeException] {
pingpong3 !! Die
pingpong3 !! (Die, 100)
}
expect("DIE") {
@ -186,20 +193,20 @@ class SupervisorSpec extends JUnitSuite {
}
}
def tesCallKillCallMultipleActorsOneForOne = {
messageLog.clear
@Test def shouldKillCallMultipleActorsOneForOne = {
clearMessageLogs
val sup = getMultipleActorsOneForOneConf
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -212,22 +219,22 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! Die
pingpong2 !! (Die, 100)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -242,11 +249,11 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldKillMultipleActorsAllForOne = {
messageLog.clear
clearMessageLogs
val sup = getMultipleActorsAllForOneConf
sup.start
intercept[RuntimeException] {
pingpong2 !! Die
pingpong2 !! (Die, 100)
}
expect("DIE") {
@ -260,20 +267,20 @@ class SupervisorSpec extends JUnitSuite {
}
}
def tesCallKillCallMultipleActorsAllForOne = {
messageLog.clear
@Test def shouldCallKillCallMultipleActorsAllForOne = {
clearMessageLogs
val sup = getMultipleActorsAllForOneConf
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -286,7 +293,7 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! Die
pingpong2 !! (Die, 100)
}
expect("DIE") {
@ -299,15 +306,15 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -322,7 +329,7 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldOneWayKillSingleActorOneForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorOneForOneSupervisor
sup.start
pingpong1 ! Die
@ -333,7 +340,7 @@ class SupervisorSpec extends JUnitSuite {
}
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
messageLog.clear
clearMessageLogs
val sup = getSingleActorOneForOneSupervisor
sup.start
pingpong1 ! OneWay
@ -347,27 +354,28 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
pingpong1 ! OneWay
expect("oneway") {
oneWayLog.poll(1, TimeUnit.SECONDS)
}
}
/*
@Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = {
messageLog.clear
clearMessageLogs
val sup = getNestedSupervisorsAllForOneConf
sup.start
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -380,11 +388,11 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! Die
pingpong2 !! (Die, 100)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(1 , TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
@ -393,15 +401,15 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
(pingpong1 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
(pingpong2 !! (Ping, 100)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
(pingpong3 !! (Ping, 100)).getOrElse("nil")
}
expect("ping") {
@ -414,9 +422,9 @@ class SupervisorSpec extends JUnitSuite {
messageLog.poll(1, TimeUnit.SECONDS)
}
}
*/
// =============================================
// Creat some supervisors with different configurations
// Create some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = {
pingpong1 = newActor[PingPong1Actor].start

View file

@ -9,11 +9,11 @@ import Actor._
object ThreadBasedActorSpec {
class TestActor extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "Hello" =>
reply("World")
self.reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
@ -28,7 +28,7 @@ class ThreadBasedActorSpec extends JUnitSuite {
@Test def shouldSendOneWay {
var oneWay = new CountDownLatch(1)
val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" => oneWay.countDown
}

View file

@ -12,6 +12,8 @@ import org.junit.{Test, Before}
import se.scalablesolutions.akka.actor.Actor
import Actor._
// FIXME use this test when we have removed the MessageInvoker classes
/*
class ThreadBasedDispatcherSpec extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
val key1 = newActor(() => new Actor { def receive = { case _ => {}} })
@ -86,3 +88,4 @@ class ThreadBasedDispatcherSpec extends JUnitSuite {
dispatcher.shutdown
}
}
*/

View file

@ -24,10 +24,10 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe
@BeanProperty var clusterName = ""
@BeanProperty var broadcaster : Broadcaster = null
override def init : Unit = ()
/** Stops the actor */
def destroy : Unit = stop
/**
* Stops the actor
*/
def destroy: Unit = self.stop
/**
* Relays all non ClusterCometBroadcast messages to the other AkkaClusterBroadcastFilters in the cluster
@ -45,10 +45,10 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe
def receive = {
//Only handle messages intended for this particular instance
case b@ClusterCometBroadcast(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
case b @ ClusterCometBroadcast(c, _) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
case _ =>
}
//Since this class is instantiated by Atmosphere, we need to make sure it's started
start
self.start
}

View file

@ -192,10 +192,10 @@ trait AuthenticationActor[C <: Credentials] extends Actor {
verify(extractCredentials(req)) match {
case Some(u: UserInfo) => {
req.setSecurityContext(mkSecurityContext(req, u))
if (roles.exists(req.isUserInRole(_))) reply(OK)
else reply(Response.status(Response.Status.FORBIDDEN).build)
if (roles.exists(req.isUserInRole(_))) self.reply(OK)
else self.reply(Response.status(Response.Status.FORBIDDEN).build)
}
case _ => reply(unauthorized)
case _ => self.reply(unauthorized)
}
}
}

View file

@ -26,7 +26,7 @@ case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: ActorRef)
class CassandraPersistentActor extends Transactor {
timeout = 100000
self.timeout = 100000
private lazy val mapState = CassandraStorage.newMap
private lazy val vectorState = CassandraStorage.newVector
@ -34,31 +34,31 @@ class CassandraPersistentActor extends Transactor {
def receive = {
case GetMapState(key) =>
reply(mapState.get(key.getBytes("UTF-8")).get)
self.reply(mapState.get(key.getBytes("UTF-8")).get)
case GetVectorSize =>
reply(vectorState.length.asInstanceOf[AnyRef])
self.reply(vectorState.length.asInstanceOf[AnyRef])
case GetRefState =>
reply(refState.get.get)
self.reply(refState.get.get)
case SetMapState(key, msg) =>
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
reply(msg)
self.reply(msg)
case SetVectorState(msg) =>
vectorState.add(msg.getBytes("UTF-8"))
reply(msg)
self.reply(msg)
case SetRefState(msg) =>
refState.swap(msg.getBytes("UTF-8"))
reply(msg)
self.reply(msg)
case Success(key, msg) =>
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
reply(msg)
self.reply(msg)
case Failure(key, msg, failer) =>
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
failer !! "Failure"
reply(msg)
self.reply(msg)
}
}

View file

@ -39,7 +39,7 @@ class BankAccountActor extends Transactor {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:" + accountNo)
reply(accountState.get(accountNo).get)
self.reply(accountState.get(accountNo).get)
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
@ -54,7 +54,7 @@ class BankAccountActor extends Transactor {
accountState.put(accountNo, (m - amount))
if (amount > m)
failer !! "Failure"
reply(m - amount)
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@ -72,7 +72,7 @@ class BankAccountActor extends Transactor {
accountState.put(accountNo, (m - bal))
}
if (bal > m) failer !! "Failure"
reply(m - bal)
self.reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
@ -85,13 +85,13 @@ class BankAccountActor extends Transactor {
case None => 0
}
accountState.put(accountNo, (m + amount))
reply(m + amount)
self.reply(m + amount)
case LogSize =>
reply(txnLog.length.asInstanceOf[AnyRef])
self.reply(txnLog.length.asInstanceOf[AnyRef])
case Log(start, finish) =>
reply(txnLog.slice(start, finish))
self.reply(txnLog.slice(start, finish))
}
}

View file

@ -16,19 +16,19 @@ class Subscriber(client: RedisClient) extends Actor {
def receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
reply(true)
self.reply(true)
case Register(cb) =>
callback = cb
reply(true)
self.reply(true)
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
reply(true)
self.reply(true)
case UnsubscribeAll =>
client.unsubscribe
reply(true)
self.reply(true)
}
}
@ -36,7 +36,7 @@ class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
self.reply(true)
}
}

View file

@ -35,7 +35,7 @@ class AccountActor extends Transactor {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
@ -49,7 +49,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
reply(m - amount)
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@ -67,7 +67,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
if (bal > m) failer !! "Failure"
reply(m - bal)
self.reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
@ -79,13 +79,13 @@ class AccountActor extends Transactor {
case None => 0
}
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
reply(m + amount)
self.reply(m + amount)
case LogSize =>
reply(txnLog.length.asInstanceOf[AnyRef])
self.reply(txnLog.length.asInstanceOf[AnyRef])
case Log(start, finish) =>
reply(txnLog.slice(start, finish))
self.reply(txnLog.slice(start, finish))
}
}

View file

@ -25,12 +25,12 @@ class QueueActor extends Transactor {
// enqueue
case NQ(accountNo) =>
accounts.enqueue(accountNo.getBytes)
reply(true)
self.reply(true)
// dequeue
case DQ =>
val d = new String(accounts.dequeue)
reply(d)
self.reply(d)
// multiple NQ and DQ
case MNDQ(enqs, no, failer) =>
@ -41,11 +41,11 @@ class QueueActor extends Transactor {
case e: Exception =>
failer !! "Failure"
}
reply(true)
self.reply(true)
// size
case SZ =>
reply(accounts.size)
self.reply(accounts.size)
}
}

View file

@ -46,26 +46,26 @@ case class RANGE(start: Int, end: Int)
case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorRef)
class SortedSetActor extends Transactor {
timeout = 100000
self.timeout = 100000
private lazy val hackers = RedisStorage.newSortedSet
def receive = {
case ADD(h) =>
hackers.+(h.name.getBytes, h.zscore)
reply(true)
self.reply(true)
case REMOVE(h) =>
hackers.-(h.name.getBytes)
reply(true)
self.reply(true)
case SIZE =>
reply(hackers.size)
self.reply(hackers.size)
case SCORE(h) =>
reply(hackers.zscore(h.name.getBytes))
self.reply(hackers.zscore(h.name.getBytes))
case RANGE(s, e) =>
reply(hackers.zrange(s, e))
self.reply(hackers.zrange(s, e))
case MULTI(a, r, failer) =>
a.foreach{ h: Hacker =>
@ -81,7 +81,7 @@ class SortedSetActor extends Transactor {
case e: Exception =>
failer !! "Failure"
}
reply((a.size, r.size))
self.reply((a.size, r.size))
}
}

View file

@ -12,7 +12,7 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote1"
protected def receive = {
case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote1")))
}
}
@ -23,7 +23,7 @@ class RemoteActor2 extends Actor with Consumer {
def endpointUri = "jetty:http://localhost:6644/remote2"
protected def receive = {
case msg: Message => reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
case msg: Message => self.reply(Message("hello %s" format msg.body, Map("sender" -> "remote2")))
}
}
@ -47,7 +47,7 @@ class Consumer1 extends Actor with Consumer with Logging {
@consume("jetty:http://0.0.0.0:8877/camel/test1")
class Consumer2 extends Actor {
def receive = {
case msg: Message => reply("Hello %s" format msg.bodyAs(classOf[String]))
case msg: Message => self.reply("Hello %s" format msg.bodyAs(classOf[String]))
}
}
@ -74,7 +74,7 @@ class Subscriber(name:String, uri: String) extends Actor with Consumer with Logg
}
class Publisher(name: String, uri: String) extends Actor with Producer {
id = name
self.id = name
def endpointUri = uri
override def oneway = true
protected def receive = produce
@ -86,7 +86,7 @@ class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consu
protected def receive = {
case msg: Message => {
publisher ! msg.bodyAs(classOf[String])
reply("message published")
self.reply("message published")
}
}
}

View file

@ -59,9 +59,8 @@ class Boot {
val jmsSubscriber2 = newActor(() => new Subscriber("jms-subscriber-2", jmsUri)).start
val jmsPublisher = newActor(() => new Publisher("jms-publisher", jmsUri)).start
//val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start
val jmsPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher).start
//val cometdPublisherBridge = newActor(() => new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
val jmsPublisherBridge = newActor(() => new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
}
class CustomRouteBuilder extends RouteBuilder {

View file

@ -97,7 +97,7 @@ trait ChatStorage extends Actor
* Redis-backed chat storage implementation.
*/
class RedisChatStorage extends ChatStorage {
lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
val CHAT_LOG = "akka.chat.log"
private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
@ -111,7 +111,7 @@ class RedisChatStorage extends ChatStorage {
case GetChatLog(_) =>
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
reply(ChatLog(messageList))
self.reply(ChatLog(messageList))
}
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
@ -163,15 +163,15 @@ trait ChatManagement { this: Actor =>
* Creates and links a RedisChatStorage.
*/
trait RedisChatStorageFactory { this: Actor =>
val storage = spawnLink[RedisChatStorage] // starts and links ChatStorage
val storage = this.self.spawnLink[RedisChatStorage] // starts and links ChatStorage
}
/**
* Chat server. Manages sessions and redirects all other messages to the Session for the client.
*/
trait ChatServer extends Actor {
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Exception])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Exception])
val storage: ActorRef
@ -188,7 +188,7 @@ trait ChatServer extends Actor {
override def shutdown = {
log.info("Chat server is shutting down...")
shutdownSessions
unlink(storage)
self.unlink(storage)
storage.stop
}
}

View file

@ -32,11 +32,11 @@ class SimpleService extends Transactor {
case Tick => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
reply(<h1>Tick: {counter + 1}</h1>)
self.reply(<h1>Tick: {counter + 1}</h1>)
} else {
storage.put(KEY, new Integer(0))
hasStartedTicking = true
reply(<h1>Tick: 0</h1>)
self.reply(<h1>Tick: 0</h1>)
}
}
}
@ -65,11 +65,11 @@ class PersistentSimpleService extends Transactor {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
reply(<success>Tick:{counter + 1}</success>)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array)
hasStartedTicking = true
reply(<success>Tick: 0</success>)
self.reply(<success>Tick: 0</success>)
}
}
}

View file

@ -10,16 +10,14 @@ import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.util.Logging
class RemoteHelloWorldActor extends RemoteActor("localhost", 9999) {
start
def receive = {
case "Hello" =>
log.info("Received 'Hello'")
reply("World")
self.reply("World")
}
}
object ClientManagedRemoteActorServer extends Logging {
def run = {
RemoteNode.start("localhost", 9999)
log.info("Remote node started")
@ -31,7 +29,7 @@ object ClientManagedRemoteActorServer extends Logging {
object ClientManagedRemoteActorClient extends Logging {
def run = {
val actor = newActor[RemoteHelloWorldActor]
val actor = newActor[RemoteHelloWorldActor].start
log.info("Remote actor created, moved to the server")
log.info("Sending 'Hello' to remote actor")
val result = actor !! "Hello"

View file

@ -10,11 +10,10 @@ import se.scalablesolutions.akka.remote.{RemoteClient, RemoteNode}
import se.scalablesolutions.akka.util.Logging
class HelloWorldActor extends Actor {
start
def receive = {
case "Hello" =>
log.info("Received 'Hello'")
reply("World")
self.reply("World")
}
}
@ -23,7 +22,7 @@ object ServerManagedRemoteActorServer extends Logging {
def run = {
RemoteNode.start("localhost", 9999)
log.info("Remote node started")
RemoteNode.register("hello-service", newActor[HelloWorldActor])
RemoteNode.register("hello-service", newActor[HelloWorldActor].start)
log.info("Remote actor registered and started")
}

View file

@ -65,11 +65,11 @@ class SimpleService extends Transactor {
case Tick => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
reply(<success>Tick:{counter + 1}</success>)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY, new Integer(0))
hasStartedTicking = true
reply(<success>Tick: 0</success>)
self.reply(<success>Tick: 0</success>)
}
}
}
@ -118,11 +118,11 @@ class PersistentSimpleService extends Transactor {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
reply(<success>Tick:{counter + 1}</success>)
self.reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY.getBytes, Array(0.toByte))
hasStartedTicking = true
reply(<success>Tick: 0</success>)
self.reply(<success>Tick: 0</success>)
}
}
}
@ -139,8 +139,8 @@ class Chat extends Actor with Logging {
def receive = {
case Chat(who, what, msg) => {
what match {
case "login" => reply("System Message__" + who + " has joined.")
case "post" => reply("" + who + "__" + msg)
case "login" => self.reply("System Message__" + who + " has joined.")
case "post" => self.reply("" + who + "__" + msg)
case _ => throw new WebApplicationException(422)
}
}

View file

@ -4,7 +4,7 @@
package sample.security
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Transactor, Actor}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
@ -90,8 +90,7 @@ import javax.annotation.security.{RolesAllowed, DenyAll, PermitAll}
import javax.ws.rs.{GET, Path, Produces}
@Path("/secureticker")
class SecureTickActor extends Actor with Logging {
makeTransactionRequired
class SecureTickActor extends Transactor with Logging {
case object Tick
private val KEY = "COUNTER"
@ -135,11 +134,11 @@ class SecureTickActor extends Actor with Logging {
case Tick => if (hasStartedTicking) {
val counter = storage.get(KEY).get.intValue
storage.put(KEY, counter + 1)
reply(new Integer(counter + 1))
self.reply(new Integer(counter + 1))
} else {
storage.put(KEY, 0)
hasStartedTicking = true
reply(new Integer(0))
self.reply(new Integer(0))
}
}
}

View file

@ -42,7 +42,7 @@
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
self.timeout = 60000
</jta>
<rest>

View file

@ -31,7 +31,7 @@
else
alert(data);
}
}
};
$('#send').click(function(e) {
var message = $('#msg').val();

View file

@ -70,7 +70,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
// ------------------------------------------------------------
// create executable jar
// Run Akka microkernel using 'sbt run' + use for packaging executable JAR
override def mainClass = Some("se.scalablesolutions.akka.kernel.Main")
override def packageOptions =

View file

@ -1,5 +1,6 @@
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
// val surefire = "bryanjswift" % "sbt-surefire-reporting" % "0.0.3-SNAPSHOT"
// val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
// val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
}