akka-core now compiles

This commit is contained in:
Jonas Bonér 2010-05-01 12:59:24 +02:00
parent 2ea646db74
commit d396961f76
52 changed files with 755 additions and 1100 deletions

View file

@ -7,7 +7,8 @@ package se.scalablesolutions.akka.amqp
import com.rabbitmq.client.{AMQP => RabbitMQ, _}
import com.rabbitmq.client.ConnectionFactory
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.{HashCode, Logging}
@ -50,7 +51,7 @@ object AMQP {
exchangeName: String,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long) =
initReconnectDelay: Long): ActorID =
supervisor.newProducer(
config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay)
@ -65,7 +66,7 @@ object AMQP {
passive: Boolean,
durable: Boolean,
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]) =
configurationArguments: Map[String, AnyRef]): ActorID =
supervisor.newConsumer(
config, hostname, port, exchangeName, exchangeType,
shutdownListener, initReconnectDelay,
@ -92,14 +93,14 @@ object AMQP {
exchangeName: String,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): Producer = {
val producer = new Producer(
initReconnectDelay: Long): ActorID = {
val producer = newActor(() => new Producer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
returnListener,
shutdownListener,
initReconnectDelay)
initReconnectDelay))
startLink(producer)
producer
}
@ -115,8 +116,8 @@ object AMQP {
passive: Boolean,
durable: Boolean,
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): Consumer = {
val consumer = new Consumer(
configurationArguments: Map[String, AnyRef]): ActorID = {
val consumer = newActor(() => new Consumer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
@ -126,7 +127,7 @@ object AMQP {
passive,
durable,
autoDelete,
configurationArguments)
configurationArguments))
startLink(consumer)
consumer
}
@ -188,11 +189,11 @@ object AMQP {
val exclusive: Boolean,
val autoDelete: Boolean,
val isUsingExistingQueue: Boolean,
val actor: Actor) extends AMQPMessage {
val actor: ActorID) extends AMQPMessage {
/**
* Creates a non-exclusive, non-autodelete message listener.
*/
def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, false, false, actor)
def this(queueName: String, routingKey: String, actor: ActorID) = this (queueName, routingKey, false, false, false, actor)
private[akka] var tag: Option[String] = None
@ -241,12 +242,12 @@ object AMQP {
exclusive: Boolean,
autoDelete: Boolean,
isUsingExistingQueue: Boolean,
actor: Actor) =
actor: ActorID) =
new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor)
def apply(queueName: String,
routingKey: String,
actor: Actor) =
actor: ActorID) =
new MessageConsumerListener(queueName, routingKey, false, false, false, actor)
}

View file

@ -10,7 +10,7 @@ import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate}
import org.apache.camel.impl.DefaultExchange
import org.apache.camel.spi.Synchronization
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.util.Logging
@ -162,7 +162,7 @@ trait Producer { self: Actor =>
*/
class ProducerResponseSender(
headers: Map[String, Any],
replyTo : Option[Either[Actor,CompletableFuture[Any]]],
replyTo : Option[Either[ActorID, CompletableFuture[Any]]],
producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender

View file

@ -11,7 +11,7 @@ import java.util.concurrent.TimeoutException
import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorID}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
/**
@ -106,7 +106,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
* Send the exchange in-message to the given actor using the ! operator. The message
* send to the actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOnly(exchange: Exchange, actor: Actor): Unit =
protected def processInOnly(exchange: Exchange, actor: ActorID): Unit =
actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
/**
@ -114,7 +114,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
* out-message is populated from the actor's reply message. The message sent to the
* actor is of type se.scalablesolutions.akka.camel.Message.
*/
protected def processInOut(exchange: Exchange, actor: Actor) {
protected def processInOut(exchange: Exchange, actor: ActorID) {
val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
val result: Any = actor !! exchange.toRequestMessage(header)
@ -128,7 +128,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
}
}
private def target: Option[Actor] =
private def target: Option[ActorID] =
if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get)

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.camel.service
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.CamelContextManager
import se.scalablesolutions.akka.util.{Bootable, Logging}
@ -17,11 +18,10 @@ import se.scalablesolutions.akka.util.{Bootable, Logging}
*/
trait CamelService extends Bootable with Logging {
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import CamelContextManager._
private[camel] val consumerPublisher = new ConsumerPublisher
private[camel] val publishRequestor = new PublishRequestor(consumerPublisher)
private[camel] val consumerPublisher = newActor[ConsumerPublisher]
private[camel] val publishRequestor = newActor(() => new PublishRequestor(consumerPublisher))
/**
* Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously)

View file

@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor}
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorID}
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging
@ -81,7 +81,7 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends
*
* @author Martin Krasser
*/
class PublishRequestor(consumerPublisher: Actor) extends Actor {
class PublishRequestor(consumerPublisher: ActorID) extends Actor {
protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match {
@ -112,24 +112,24 @@ object Publish {
* Creates a list of Publish request messages for all consumer actors in the <code>actors</code>
* list.
*/
def forConsumers(actors: List[Actor]): List[Publish] =
def forConsumers(actors: List[ActorID]): List[Publish] =
for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get
/**
* Creates a Publish request message if <code>actor</code> is a consumer actor.
*/
def forConsumer(actor: Actor): Option[Publish] =
def forConsumer(actor: ActorID): Option[Publish] =
forConsumeAnnotated(actor) orElse forConsumerType(actor)
private def forConsumeAnnotated(actor: Actor): Option[Publish] = {
private def forConsumeAnnotated(actor: ActorID): Option[Publish] = {
val annotation = actor.getClass.getAnnotation(classOf[consume])
if (annotation eq null) None
else if (actor._remoteAddress.isDefined) None // do not publish proxies
else if (actor.remoteAddress.isDefined) None // do not publish proxies
else Some(Publish(annotation.value, actor.getId, false))
}
private def forConsumerType(actor: Actor): Option[Publish] =
private def forConsumerType(actor: ActorID): Option[Publish] =
if (!actor.isInstanceOf[Consumer]) None
else if (actor._remoteAddress.isDefined) None
else if (actor.remoteAddress.isDefined) None
else Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true))
}

View file

@ -6,7 +6,7 @@ import org.apache.camel.component.mock.MockEndpoint
import org.scalatest.{GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import se.scalablesolutions.akka.actor.Actor._
class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
override protected def beforeAll = {
@ -27,7 +27,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message sync and receive response") {
given("a registered synchronous two-way producer for endpoint direct:producer-test-2")
val producer = new TestProducer("direct:producer-test-2") with Sync
val producer = newActor(() => new TestProducer("direct:producer-test-2") with Sync)
producer.start
when("a test message is sent to the producer")
@ -41,7 +41,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message async and receive response") {
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
val producer = new TestProducer("direct:producer-test-2")
val producer = newActor(() => new TestProducer("direct:producer-test-2"))
producer.start
when("a test message is sent to the producer")
@ -55,7 +55,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message sync and receive failure") {
given("a registered synchronous two-way producer for endpoint direct:producer-test-2")
val producer = new TestProducer("direct:producer-test-2") with Sync
val producer = newActor(() => new TestProducer("direct:producer-test-2") with Sync)
producer.start
when("a fail message is sent to the producer")
@ -71,7 +71,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message async and receive failure") {
given("a registered asynchronous two-way producer for endpoint direct:producer-test-2")
val producer = new TestProducer("direct:producer-test-2")
val producer = newActor(() => new TestProducer("direct:producer-test-2"))
producer.start
when("a fail message is sent to the producer")
@ -87,7 +87,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message sync oneway") {
given("a registered synchronous one-way producer for endpoint direct:producer-test-1")
val producer = new TestProducer("direct:producer-test-1") with Sync with Oneway
val producer = newActor(() => new TestProducer("direct:producer-test-1") with Sync with Oneway)
producer.start
when("a test message is sent to the producer")
@ -100,7 +100,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
scenario("produce message async oneway") {
given("a registered asynchronous one-way producer for endpoint direct:producer-test-1")
val producer = new TestProducer("direct:producer-test-1") with Oneway
val producer = newActor(() => new TestProducer("direct:producer-test-1") with Oneway)
producer.start
when("a test message is sent to the producer")

View file

@ -5,6 +5,7 @@ import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import se.scalablesolutions.akka.camel.{CamelContextManager, Message, Consumer}
import Actor._
class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen {
var service: CamelService = CamelService.newInstance
@ -17,11 +18,11 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
CamelContextManager.init
CamelContextManager.context.addRoutes(new TestRoute)
// set expectations for testing purposes
service.consumerPublisher.expectPublishCount(1)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
// start the CamelService
service.load
// await publication of first test consumer
service.consumerPublisher.awaitPublish
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
}
override protected def afterAll = {
@ -34,11 +35,11 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access registered consumer actors via Camel direct-endpoints") {
given("two consumer actors registered before and after CamelService startup")
service.consumerPublisher.expectPublishCount(1)
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1)
new TestConsumer("direct:publish-test-2").start
when("requests are sent to these actors")
service.consumerPublisher.awaitPublish
service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish
val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1")
val response2 = CamelContextManager.template.requestBody("direct:publish-test-2", "msg2")
@ -53,7 +54,7 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
scenario("access an actor from the custom Camel route") {
given("a registered actor and a custom route to that actor")
val actor = new TestActor().start
val actor = newActor[TestActor].start
when("sending a a message to that route")
val response = CamelContextManager.template.requestBody("direct:custom-route-test-1", "msg3")

View file

@ -6,22 +6,23 @@ import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.camel.Consumer
import se.scalablesolutions.akka.camel.support.{Receive, Countdown}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRegistered, Actor}
import Actor._
class PublishRequestorTest extends JUnitSuite {
@After def tearDown = ActorRegistry.shutdownAll
@Test def shouldReceivePublishRequestOnActorRegisteredEvent = {
val consumer = new Actor with Consumer {
val consumer = newActor(() => new Actor with Consumer {
def endpointUri = "mock:test"
protected def receive = null
}
val publisher = new PublisherMock with Countdown[Publish]
val requestor = new PublishRequestor(publisher)
})
val publisher = newActor(() => new PublisherMock with Countdown[Publish])
val requestor = newActor(() => new PublishRequestor(publisher))
publisher.start
requestor.start
requestor.!(ActorRegistered(consumer))(None)
publisher.waitFor
assert(publisher.received === Publish("mock:test", consumer.uuid, true))
publisher.actor.asInstanceOf[Countdown[Publish]].waitFor
assert(publisher.actor.asInstanceOf[PublisherMock].received === Publish("mock:test", consumer.uuid, true))
publisher.stop
requestor.stop
}

View file

@ -4,29 +4,30 @@ import org.junit.Test
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.Consumer
class PublishTest extends JUnitSuite {
@Test def shouldCreatePublishRequestList = {
val publish = Publish.forConsumers(List(new ConsumeAnnotatedActor))
val publish = Publish.forConsumers(List(newActor[ConsumeAnnotatedActor]))
assert(publish === List(Publish("mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorId = {
val publish = Publish.forConsumer(new ConsumeAnnotatedActor)
val publish = Publish.forConsumers(List(newActor[ConsumeAnnotatedActor]))
assert(publish === Some(Publish("mock:test1", "test", false)))
}
@Test def shouldCreateSomePublishRequestWithActorUuid = {
val actor = new ConsumerActor
val publish = Publish.forConsumer(actor)
assert(publish === Some(Publish("mock:test2", actor.uuid, true)))
assert(publish === Some(Publish("mock:test2", actor.uuid, true)))
val ca = newActor[ConsumerActor]
val publish = Publish.forConsumers(List(ca))
assert(publish === Some(Publish("mock:test2", ca.uuid, true)))
assert(publish === Some(Publish("mock:test2", ca.uuid, true)))
}
@Test def shouldCreateNone = {
val publish = Publish.forConsumer(new PlainActor)
val publish = Publish.forConsumer(newActor[PlainActor])
assert(publish === None)
}

View file

@ -1,105 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.cluster.shoal
import java.util.Properties
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.remote.{ClusterActor, BasicClusterActor, RemoteServer}
import com.sun.enterprise.ee.cms.core._
import com.sun.enterprise.ee.cms.impl.client._
/**
* Clustering support via Shoal.
*/
class ShoalClusterActor extends BasicClusterActor {
type ADDR_T = String
@volatile protected var gms : Option[GroupManagementService] = None
protected lazy val serverName : String = RemoteServer.HOSTNAME + ":" + RemoteServer.PORT
@volatile private var isActive = false
lazy val topic : String = config.getString("akka.remote.cluster.shoal.topic") getOrElse "akka-messages"
override def init = {
super.init
gms = Some(createGMS)
isActive = true
}
override def shutdown = {
super.shutdown
isActive = false
for(g <- gms) g.shutdown(GMSConstants.shutdownType.INSTANCE_SHUTDOWN)
gms = None
}
/**
* Constructs a Properties instance with properties designated for the underlying
* Shoal cluster transport (JXTA,JGroups)
*/
protected def properties() : Properties = {
config.getConfigMap("akka.remote.cluster.shoal.properties").map( m => {
new Properties(){
for(key <- m.keys) setProperty(key,m(key))
}
}).getOrElse(null)
}
/**
* Creates a GroupManagementService, provides it with the proper properties
* Adds callbacks and boots up the cluster
*/
protected def createGMS : GroupManagementService = {
val g = GMSFactory
.startGMSModule(serverName,name, GroupManagementService.MemberType.CORE, properties())
.asInstanceOf[GroupManagementService]
val callback = createCallback
g.addActionFactory(new JoinNotificationActionFactoryImpl(callback))
g.addActionFactory(new FailureSuspectedActionFactoryImpl(callback))
g.addActionFactory(new FailureNotificationActionFactoryImpl(callback))
g.addActionFactory(new PlannedShutdownActionFactoryImpl(callback))
g.addActionFactory(new MessageActionFactoryImpl(callback), topic)
g.join
g
}
/**
* Creates a CallBack instance that deals with the cluster signalling
*/
protected def createCallback : CallBack = {
import scala.collection.JavaConversions._
import ClusterActor._
val me = this
new CallBack {
def processNotification(signal : Signal) {
try {
signal.acquire()
if(isActive) {
signal match {
case ms : MessageSignal => me ! Message[ADDR_T](ms.getMemberToken,ms.getMessage)
case jns : JoinNotificationSignal => me ! View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers - serverName)
case fss : FailureSuspectedSignal => me ! Zombie[ADDR_T](fss.getMemberToken)
case fns : FailureNotificationSignal => me ! Zombie[ADDR_T](fns.getMemberToken)
case _ => log.debug("Unhandled signal: [%s]",signal)
}
}
signal.release()
} catch {
case e : SignalAcquireException => log.warning(e,"SignalAcquireException")
case e : SignalReleaseException => log.warning(e,"SignalReleaseException")
}
}
}
}
protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit =
for(g <- gms) g.getGroupHandle.sendMessage(dest,topic, msg)
protected def toAllNodes(msg : Array[Byte]) : Unit =
for(g <- gms) g.getGroupHandle.sendMessage(topic, msg)
}

View file

@ -5,15 +5,12 @@
package se.scalablesolutions.akka.comet
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
name = classOf[AkkaBroadcaster].getName
val caster = new Actor {
def receive = { case f : Function0[_] => f() }
start
}
val caster = actor { case f : Function0[_] => f() }
override def destroy {
super.destroy

View file

@ -180,7 +180,8 @@ object ActiveObject {
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long,
dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
@ -194,7 +195,8 @@ object ActiveObject {
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean,
dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
@ -208,7 +210,8 @@ object ActiveObject {
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean,
dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
@ -222,7 +225,8 @@ object ActiveObject {
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher,
hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
@ -236,35 +240,40 @@ object ActiveObject {
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher,
hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean,
dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher,
hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean,
dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean,
dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
@ -274,11 +283,10 @@ object ActiveObject {
val proxy = Proxy.newInstance(target, false, false)
actor.initialize(target, proxy)
actor.timeout = timeout
if (remoteAddress.isDefined) {
actor.makeRemote(remoteAddress.get)
}
AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout))
actor.start
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val actorId = new ActorID(() => actor)
AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout))
actorId.start
proxy.asInstanceOf[T]
}
@ -286,20 +294,18 @@ object ActiveObject {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
actor.initialize(target.getClass, target)
actor.timeout = timeout
if (remoteAddress.isDefined) {
actor.makeRemote(remoteAddress.get)
}
AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout))
actor.start
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val actorId = new ActorID(() => actor)
AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout))
actorId.start
proxy.asInstanceOf[T]
}
/**
* Get the underlying dispatcher actor for the given active object.
*/
def actorFor(obj: AnyRef): Option[Actor] = {
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
}
def actorFor(obj: AnyRef): Option[ActorID] =
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj))
/**
* Links an other active object to this active object.
@ -382,10 +388,10 @@ private[akka] object AspectInitRegistry {
private[akka] sealed case class AspectInit(
val target: Class[_],
val actor: Dispatcher,
val actorId: ActorID,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) {
def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout)
def this(target: Class[_], actorId: ActorID, timeout: Long) = this(target, actorId, None, timeout)
}
/**
@ -399,7 +405,7 @@ private[akka] sealed case class AspectInit(
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
private var target: Class[_] = _
private var actor: Dispatcher = _
private var actorId: ActorID = _
private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _
@ -408,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect {
if (!isInitialized) {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
target = init.target
actor = init.actor
actorId = init.actorId
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
@ -424,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect {
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
if (isOneWay(rtti)) {
(actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
(actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
}
else {
val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
@ -441,13 +447,13 @@ private[akka] sealed class ActiveObjectAspect {
.setId(RemoteRequestIdFactory.nextId)
.setMethod(rtti.getMethod.getName)
.setTarget(target.getName)
.setUuid(actor.uuid)
.setUuid(actorId.uuid)
.setTimeout(timeout)
.setIsActor(false)
.setIsOneWay(oneWay_?)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = actor.registerSupervisorAsRemoteActor
val id = actorId.actor.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val remoteMessage = requestBuilder.build
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None)
@ -513,8 +519,8 @@ private[akka] sealed class ActiveObjectAspect {
}
}
// Jan Kronquist: started work on issue 121
private[akka] case class Link(val actor: Actor)
// FIXME Jan Kronquist: started work on issue 121
private[akka] case class Link(val actor: ActorID)
object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()

View file

@ -47,6 +47,7 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
makeRemote(hostname, port)
}
// Life-cycle messages for the Actors
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage
@ -55,30 +56,231 @@ case class Unlink(child: ActorID) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorID) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
// Exceptions for Actors
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
class ActorInitializationException private[akka](message: String) extends RuntimeException(message)
sealed abstract class DispatcherType
object DispatcherType {
case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
case object EventBasedSingleThreadDispatcher extends DispatcherType
case object EventBasedThreadPoolDispatcher extends DispatcherType
case object ThreadBasedDispatcher extends DispatcherType
/**
* Utility class with factory methods for creating Actors.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
// FIXME remove next release
object Sender {
@deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
object Self
}
/**
* Creates a new ActorID out of the Actor with type T.
* <pre>
* import Actor._
* val actor = newActor[MyActor]
* actor.start
* actor ! message
* actor.stop
* </pre>
*/
def newActor[T <: Actor: Manifest]: ActorID = new ActorID(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Creates a new ActorID out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = newActor(() => new MyActor)
* actor.start
* actor ! message
* actor.stop
* </pre>
*/
def newActor(factory: () => Actor): ActorID = new ActorID(factory)
/**
* Use to create an anonymous event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
start
def receive: PartialFunction[Any, Unit] = body
})
/**
* Use to create an anonymous transactional event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = transactor {
* case msg => ... // handle message
* }
* </pre>
*/
def transactor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(() => new Transactor() {
lifeCycle = Some(LifeCycle(Permanent))
start
def receive: PartialFunction[Any, Unit] = body
})
/**
* Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
* which means that if the actor is supervised and dies it will *not* be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = temporaryActor {
* case msg => ... // handle message
* }
* </pre>
*/
def temporaryActor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(() => new Actor() {
lifeCycle = Some(LifeCycle(Temporary))
start
def receive = body
})
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* val a = Actor.init {
* ... // init stuff
* } receive {
* case msg => ... // handle message
* }
* </pre>
*
*/
def init[A](body: => Unit) = {
def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) =
new ActorID(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
start
body
def receive = handler
})
}
handler(body)
}
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import Actor._
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit): Unit = {
case object Spawn
new Actor() {
start
self ! Spawn
def receive = {
case Spawn => body; stop
}
}
}
}
/**
* FIXME document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actor.invoke(handle)
class ActorMessageInvoker(val actorId: ActorID) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle)
}
final class ActorID private[akka] (private[akka] val actor: Actor) {
if (actor eq null) throw new IllegalArgumentException("Actor instance passed to ActorID can not be 'null'")
/**
* ActorID is an immutable and serializable handle to an Actor.
* Create an ActorID for an Actor by using the factory method on the Actor object.
* Here is an example:
* <pre>
* import Actor._
*
* val actor = newActor[MyActor]
* actor.start
* actor ! message
* actor.stop
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class ActorID private[akka] () {
private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
private[akka] def this(clazz: Class[_ <: Actor]) = {
this()
newActorFactory = Left(Some(clazz))
}
private[akka] def this(factory: () => Actor) = {
this()
newActorFactory = Right(Some(factory))
}
lazy val actor: Actor = {
val actor = newActorFactory match {
case Left(Some(clazz)) => clazz.newInstance
case Right(Some(factory)) => factory()
case _ => throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope")
}
if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'")
actor
}
/**
* Starts up the actor and its message queue.
*/
def start = actor.start
def start: ActorID = {
actor.start
this
}
/**
* Shuts down the actor its dispatcher and message queue.
@ -251,9 +453,19 @@ final class ActorID private[akka] (private[akka] val actor: Actor) {
*/
def uuid = actor.uuid
override def toString = "ActorID[" + actor.toString + "]"
override def hashCode = actor.hashCode
override def equals(that: AnyRef) = actor.equals(that)
/**
* Returns the remote address for the actor, if any, else None.
*/
def remoteAddress: Option[InetSocketAddress] = actor._remoteAddress
/**
* Returns the default timeout for the actor.
*/
def timeout: Long = actor.timeout
override def toString: String = "ActorID[" + actor.toString + "]"
override def hashCode: Int = actor.hashCode
override def equals(that: Any): Boolean = actor.equals(that)
private[akka] def supervisor_=(sup: Option[ActorID]): Unit = actor._supervisor = sup
@ -267,151 +479,6 @@ final class ActorID private[akka] (private[akka] val actor: Actor) {
private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
}
/**
* Utility class with factory methods for creating Actors.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender {
@deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
object Self
}
def newActor[T <: Actor: Manifest]: ActorID = {
val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
new ActorID(actor)
}
/**
* Use to create an anonymous event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
start
def receive: PartialFunction[Any, Unit] = body
})
/**
* Use to create an anonymous transactional event-driven actor.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = transactor {
* case msg => ... // handle message
* }
* </pre>
*/
def transactor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(new Transactor() {
lifeCycle = Some(LifeCycle(Permanent))
start
def receive: PartialFunction[Any, Unit] = body
})
/**
* Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration,
* which means that if the actor is supervised and dies it will *not* be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = temporaryActor {
* case msg => ... // handle message
* }
* </pre>
*/
def temporaryActor(body: PartialFunction[Any, Unit]): ActorID =
new ActorID(new Actor() {
lifeCycle = Some(LifeCycle(Temporary))
start
def receive = body
})
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* <p/>
* The actor is created with a 'permanent' life-cycle configuration, which means that
* if the actor is supervised and dies it will be restarted.
* <p/>
* The actor is started when created.
* Example:
* <pre>
* val a = Actor.init {
* ... // init stuff
* } receive {
* case msg => ... // handle message
* }
* </pre>
*
*/
def init[A](body: => Unit) = {
def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) =
new ActorID(new Actor() {
lifeCycle = Some(LifeCycle(Permanent))
start
body
def receive = handler
})
}
handler(body)
}
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import Actor._
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit): Unit = {
case object Spawn
new Actor() {
start
selfId ! Spawn
def receive = {
case Spawn => body; stop
}
}
}
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
@ -429,7 +496,18 @@ trait Actor extends TransactionManagement with Logging {
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
implicit private[akka] var _actorID: Option[ActorID] = None
/**
* The 'self' field holds the ActorID for this actor.
* Can be used to send messages to itself:
* <pre>
* self ! message
* </pre>
* Note: if you are using the 'self' field in the constructor of the Actor
* then you have to make the fields/operations that are using it 'lazy'.
*/
implicit val self = new ActorID(() => this)
protected implicit val selfOption = Some(self)
// ====================================
// private fields
@ -619,26 +697,20 @@ trait Actor extends TransactionManagement with Logging {
// ==== API ====
// =============
/**
* 'selfId' holds the ActorID for this actor.
*/
def selfId: ActorID =
_actorID.getOrElse(throw new IllegalStateException("ActorID for actor " + toString + " is not available"))
/**
* Starts up the actor and its message queue.
*/
def start: Unit = synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!_isRunning) {
messageDispatcher.register(selfId)
messageDispatcher.register(self)
messageDispatcher.start
_isRunning = true
init
initTransactionalState
}
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(selfId)
ActorRegistry.register(self)
}
/**
@ -652,11 +724,11 @@ trait Actor extends TransactionManagement with Logging {
*/
def stop = synchronized {
if (_isRunning) {
messageDispatcher.unregister(selfId)
messageDispatcher.unregister(self)
_isRunning = false
_isShutDown = true
shutdown
ActorRegistry.unregister(selfId)
ActorRegistry.unregister(self)
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
@ -688,14 +760,14 @@ trait Actor extends TransactionManagement with Logging {
* Throws an IllegalStateException if unable to determine what to reply to
*/
protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
@ -725,9 +797,9 @@ trait Actor extends TransactionManagement with Logging {
*/
def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
if (!_isRunning) {
messageDispatcher.unregister(selfId)
messageDispatcher.unregister(self)
messageDispatcher = md
messageDispatcher.register(selfId)
messageDispatcher.register(self)
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@ -786,7 +858,7 @@ trait Actor extends TransactionManagement with Logging {
if (actorId.supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails")
getLinkedActors.add(actorId)
actorId.supervisor = Some(selfId)
actorId.supervisor = Some(self)
Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this)
}
@ -907,7 +979,7 @@ trait Actor extends TransactionManagement with Logging {
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
actor.dispatcher = dispatcher
}
new ActorID(actor)
new ActorID(() => actor)
}
protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = {
@ -949,7 +1021,7 @@ trait Actor extends TransactionManagement with Logging {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(selfId, message, sender.map(Left(_)), transactionSet.get)
val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get)
if (messageDispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
@ -982,7 +1054,7 @@ trait Actor extends TransactionManagement with Logging {
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(selfId, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation)
@ -1027,9 +1099,9 @@ trait Actor extends TransactionManagement with Logging {
_isKilled = true
Actor.log.error(e, "Could not invoke actor [%s]", this)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e)
if (_supervisor.isDefined) _supervisor.get ! Exit(self, e)
replyTo match {
case Some(Right(future)) => future.completeWithException(selfId, e)
case Some(Right(future)) => future.completeWithException(self, e)
case _ =>
}
} finally {
@ -1078,7 +1150,7 @@ trait Actor extends TransactionManagement with Logging {
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
replyTo match {
case Some(Right(future)) => future.completeWithException(selfId, e)
case Some(Right(future)) => future.completeWithException(self, e)
case _ =>
}
@ -1086,7 +1158,7 @@ trait Actor extends TransactionManagement with Logging {
if (topLevelTransaction) clearTransactionSet
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e)
if (_supervisor.isDefined) _supervisor.get ! Exit(self, e)
} finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
@ -1137,7 +1209,7 @@ trait Actor extends TransactionManagement with Logging {
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" +
"\n\tshutting down and unlinking supervisor actor as well [%s].",
actor.id)
_supervisor.foreach(_ ! UnlinkAndStop(selfId))
_supervisor.foreach(_ ! UnlinkAndStop(self))
}
}
}
@ -1155,7 +1227,7 @@ trait Actor extends TransactionManagement with Logging {
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(self)
Some(_supervisor.get.uuid)
} else None
}
@ -1203,3 +1275,11 @@ trait Actor extends TransactionManagement with Logging {
override def toString = "Actor[" + id + ":" + uuid + "]"
}
sealed abstract class DispatcherType
object DispatcherType {
case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
case object EventBasedSingleThreadDispatcher extends DispatcherType
case object EventBasedThreadPoolDispatcher extends DispatcherType
case object ThreadBasedDispatcher extends DispatcherType
}

View file

@ -53,13 +53,13 @@ object ActorRegistry extends Logging {
/**
* Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = {
val all = new ListBuffer[T]
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorID] = {
val all = new ListBuffer[ActorID]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorId = elements.nextElement
if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) {
all += actorId.actor.asInstanceOf[T]
all += actorId
}
}
all.toList
@ -68,17 +68,16 @@ object ActorRegistry extends Logging {
/**
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/
def actorsFor[T <: Actor](clazz: Class[T]): List[T] = {
if (actorsByClassName.containsKey(clazz.getName)) {
actorsByClassName.get(clazz.getName).asInstanceOf[List[T]]
} else Nil
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorID] = {
if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName)
else Nil
}
/**
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): List[ActorID] = {
if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[ActorID]]
if (actorsById.containsKey(id)) actorsById.get(id)
else Nil
}

View file

@ -105,17 +105,17 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
private lazy val value = Ref[T]()
this ! Value(initialValue)
self ! Value(initialValue)
/**
* Periodically handles incoming messages.
*/
def receive = {
case Value(v: T) =>
case Value(v: T) =>
swap(v)
case Function(fun: (T => T)) =>
case Function(fun: (T => T)) =>
swap(fun(value.getOrWait))
case Procedure(proc: (T => Unit)) =>
case Procedure(proc: (T => Unit)) =>
proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null"))))
}
@ -157,22 +157,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
/**
* Submits the provided function for execution against the internal agent's state.
*/
final def apply(message: (T => T)): Unit = this ! Function(message)
final def apply(message: (T => T)): Unit = self ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
final def apply(message: T): Unit = this ! Value(message)
final def apply(message: T): Unit = self ! Value(message)
/**
* Submits the provided function of type 'T => T' for execution against the internal agent's state.
*/
final def send(message: (T) => T): Unit = this ! Function(message)
final def send(message: (T) => T): Unit = self ! Function(message)
/**
* Submits a new value to be set as the new agent's internal state.
*/
final def send(message: T): Unit = this ! Value(message)
final def send(message: T): Unit = self ! Value(message)
/**
* Asynchronously submits a procedure of type 'T => Unit' to read the internal state.
@ -180,7 +180,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* of the internal state will be used, depending on the underlying effective copyStrategy.
* Does not change the value of the agent (this).
*/
final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f)
final def sendProc(f: (T) => Unit): Unit = self ! Procedure(f)
/**
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.

View file

@ -26,12 +26,12 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>
Scheduler.stopSupervising(this)
Scheduler.stopSupervising(self)
future.cancel(true)
exit
}
@ -39,18 +39,18 @@ class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) ex
object Scheduler extends Actor {
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[Actor, Actor]
private val schedulers = new ConcurrentHashMap[ActorID, ActorID]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
start
def schedule(receiver: Actor, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
def schedule(receiver: ActorID, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
startLink(new ScheduleActor(
startLink(new ActorID(() => new ScheduleActor(
receiver,
service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message;
}, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]))
}, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]])))
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
@ -58,9 +58,9 @@ object Scheduler extends Actor {
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
def stopSupervising(actor: Actor) = {
unlink(actor)
schedulers.remove(actor)
def stopSupervising(actorId: ActorID) = {
unlink(actorId)
schedulers.remove(actorId)
}
override def shutdown = {

View file

@ -82,6 +82,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
trapExit = trapExceptions
faultHandler = Some(handler)
// FIXME should Supervisor really havea newThreadBasedDispatcher??
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, List[ActorID]]
@ -144,8 +146,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
if (list eq null) List[ActorID]()
else list
}
actors.put(className, supervisor.selfId :: currentSupervisors)
link(supervisor.selfId)
actors.put(className, supervisor.self :: currentSupervisors)
link(supervisor.self)
})
}
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config
import com.google.inject._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorID}
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
@ -94,7 +94,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
supervised ::= Supervise(actor, component.lifeCycle)
supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
}
@ -116,7 +116,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
supervised ::= Supervise(actor, component.lifeCycle)
supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
/**
* Scala API. Dispatcher factory.
@ -40,7 +40,7 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: Actor) = {
override def register(actor: ActorID) = {
if (isShutdown) init
super.register(actor)
}

View file

@ -172,7 +172,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
thief.selfId ! donated.message
thief.self ! donated.message
return Some(donated)
} else return None
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker}
import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker}
class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker)
extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor))
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorID(() => actor)))
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _

View file

@ -212,19 +212,19 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Registers a local endpoint
*/
def registerLocalNode(hostname: String, port: Int): Unit =
selfId ! RegisterLocalNode(RemoteAddress(hostname, port))
self ! RegisterLocalNode(RemoteAddress(hostname, port))
/**
* Deregisters a local endpoint
*/
def deregisterLocalNode(hostname: String, port: Int): Unit =
selfId ! DeregisterLocalNode(RemoteAddress(hostname, port))
self ! DeregisterLocalNode(RemoteAddress(hostname, port))
/**
* Broadcasts the specified message to all Actors of type Class on all known Nodes
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
selfId ! RelayedMessage(to.getName, msg)
self ! RelayedMessage(to.getName, msg)
}
/**
@ -261,7 +261,7 @@ object Cluster extends Cluster with Logging {
val sup = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor.selfId, LifeCycle(Permanent)) :: Nil)
Supervise(actor.self, LifeCycle(Permanent)) :: Nil)
).newInstance
Some(sup)
}

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorID}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.config.Config.config
@ -53,21 +53,21 @@ object RemoteClient extends Logging {
// FIXME: simplify overloaded methods when we have Scala 2.8
def actorFor(className: String, hostname: String, port: Int): Actor =
def actorFor(className: String, hostname: String, port: Int): ActorID =
actorFor(className, className, 5000L, hostname, port)
def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor =
def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorID =
actorFor(actorId, className, 5000L, hostname, port)
def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor =
def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID =
actorFor(className, className, timeout, hostname, port)
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = {
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID(() =>
new Actor {
start
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@ -77,10 +77,10 @@ object RemoteClient extends Logging {
.setIsOneWay(true)
.setIsEscaped(false)
if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
val sndr = sender.get.actor
requestBuilder.setSourceTarget(sndr.getClass.getName)
requestBuilder.setSourceUuid(sndr.uuid)
val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
@ -108,7 +108,7 @@ object RemoteClient extends Logging {
def receive = {case _ => {}}
}
}
)
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
@ -174,8 +174,8 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
@volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, Actor]
private[remote] val listeners = new ConcurrentSkipListSet[Actor]
private val supervisors = new ConcurrentHashMap[String, ActorID]
private[remote] val listeners = new ConcurrentSkipListSet[ActorID]
private val channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool,
@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause))
listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(connection.getCause))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@ -232,21 +232,21 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception))
throw exception
}
def registerSupervisorForActor(actor: Actor) =
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor)
def registerSupervisorForActor(actorId: ActorID) =
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision")
else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId)
def deregisterSupervisorForActor(actor: Actor) =
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
else supervisors.remove(actor._supervisor.get.uuid)
def deregisterSupervisorForActor(actorId: ActorID) =
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision")
else supervisors.remove(actorId.supervisor.get.uuid)
def registerListener(actor: Actor) = listeners.add(actor)
def registerListener(actorId: ActorID) = listeners.add(actorId)
def deregisterListener(actor: Actor) = listeners.remove(actor)
def deregisterListener(actorId: ActorID) = listeners.remove(actorId)
}
/**
@ -254,7 +254,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
*/
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFuture[_]],
supervisors: ConcurrentMap[String, Actor],
supervisors: ConcurrentMap[String, ActorID],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
timer: HashedWheelTimer,
@ -285,7 +285,7 @@ class RemoteClientPipelineFactory(name: String,
@ChannelHandler.Sharable
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFuture[_]],
val supervisors: ConcurrentMap[String, Actor],
val supervisors: ConcurrentMap[String, ActorID],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer,
@ -316,21 +316,21 @@ class RemoteClientHandler(val name: String,
if (!supervisors.containsKey(supervisorUuid))
throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor._supervisor.isDefined)
if (!supervisedActor.supervisor.isDefined)
throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply))
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
}
future.completeWithException(null, parseException(reply))
}
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception))
throw exception
}
} catch {
case e: Exception =>
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String,
// Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(event.getCause))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -198,7 +198,7 @@ class RemoteServer extends Logging {
/**
* Register Remote Actor by the Actor's 'id' field.
*/
def register(actor: Actor) = synchronized {
def register(actor: ActorID) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
@ -208,7 +208,7 @@ class RemoteServer extends Logging {
/**
* Register Remote Actor by a specific 'id' passed as argument.
*/
def register(id: String, actor: Actor) = synchronized {
def register(id: String, actor: ActorID) = synchronized {
if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
@ -225,7 +225,7 @@ class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, Actor],
val actors: JMap[String, ActorID],
val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._
@ -256,7 +256,7 @@ class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, Actor],
val actors: JMap[String, ActorID],
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
@ -300,8 +300,8 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to remote actor [%s]", request.getTarget)
val actor = createActor(request.getTarget, request.getUuid, request.getTimeout)
actor.start
val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorId.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
@ -310,19 +310,19 @@ class RemoteServerHandler(
val targetClass = if (request.hasSourceTarget) request.getSourceTarget
else request.getTarget
val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
if (!remoteActor.isRunning) {
remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActor.start
val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout)
if (!remoteActorId.isRunning) {
remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActorId.start
}
actor.!(message)(Some(remoteActor))
actorId.!(message)(Some(remoteActorId))
} else {
// couldn't find a way to reply, send the message without a source/sender
actor ! message
actorId ! message
}
} else {
try {
val resultOrNone = actor !! message
val resultOrNone = actorId !! message
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReply.newBuilder
@ -440,9 +440,9 @@ class RemoteServerHandler(
* If actor already created then just return it from the registry.
* Does not start the actor.
*/
private def createActor(name: String, uuid: String, timeout: Long): Actor = {
val actorOrNull = actors.get(uuid)
if (actorOrNull eq null) {
private def createActor(name: String, uuid: String, timeout: Long): ActorID = {
val actorIdOrNull = actors.get(uuid)
if (actorIdOrNull eq null) {
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
@ -451,13 +451,14 @@ class RemoteServerHandler(
newInstance._uuid = uuid
newInstance.timeout = timeout
newInstance._remoteAddress = None
actors.put(uuid, newInstance)
newInstance
val actorId = new ActorID(() => newInstance)
actors.put(uuid, actorId)
actorId
} catch {
case e =>
log.error(e, "Could not create remote actor instance")
throw e
}
} else actorOrNull
} else actorIdOrNull
}
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import se.scalablesolutions.akka.dispatch.CompletableFuture
/**
@ -26,7 +26,7 @@ import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
val thread = new ActorID(() => new IsolatedEventBasedThread(body)).start
thread ! Start
thread
}
@ -60,7 +60,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private case object Get extends DataFlowVariableMessage
private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[Actor]
private val blockedReaders = new ConcurrentLinkedQueue[ActorID]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
@ -97,7 +97,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
}
}
private[this] val in = new In(this)
private[this] val in = new ActorID(() => new In(this))
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
@ -107,7 +107,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
val ref = value.get
if (ref.isDefined) ref.get
else {
val out = new Out(this)
val out = new ActorID(() => new Out(this))
blockedReaders.offer(out)
val result = out !! Get
out ! Exit

View file

@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
class ActorFireForgetRequestReplySpec extends JUnitSuite {
@ -22,7 +23,7 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
}
}
class SenderActor(replyActor: Actor) extends Actor {
class SenderActor(replyActor: ActorID) extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
@ -41,11 +42,9 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
@Test
def shouldReplyToBangMessageUsingReply = {
import Actor.Sender.Self
val replyActor = new ReplyActor
val replyActor = newActor[ReplyActor]
replyActor.start
val senderActor = new SenderActor(replyActor)
val senderActor = newActor(() => new SenderActor(replyActor))
senderActor.start
senderActor ! "Init"
assert(state.finished.await(1, TimeUnit.SECONDS))
@ -54,11 +53,9 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
@Test
def shouldReplyToBangMessageUsingImplicitSender = {
import Actor.Sender.Self
val replyActor = new ReplyActor
val replyActor = newActor[ReplyActor]
replyActor.start
val senderActor = new SenderActor(replyActor)
val senderActor = newActor(() => new SenderActor(replyActor))
senderActor.start
senderActor ! "InitImplicit"
assert(state.finished.await(1, TimeUnit.SECONDS))

View file

@ -2,6 +2,7 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
class ActorRegistrySpec extends JUnitSuite {
var record = ""
@ -16,18 +17,18 @@ class ActorRegistrySpec extends JUnitSuite {
@Test def shouldGetActorByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId == "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
actor.stop
}
@Test def shouldGetActorByUUIDFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val actor = newActor[TestActor]
val uuid = actor.uuid
actor.start
val actorOrNone = ActorRegistry.actorFor(uuid)
@ -38,95 +39,95 @@ class ActorRegistrySpec extends JUnitSuite {
@Test def shouldGetActorByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
actor.stop
}
@Test def shouldGetActorByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 1)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
actor.stop
}
@Test def shouldGetActorsByIdFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val actors = ActorRegistry.actorsFor("MyID")
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByClassFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val actors = ActorRegistry.actorsFor(classOf[TestActor])
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetActorsByManifestFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val actors: List[TestActor] = ActorRegistry.actorsFor[TestActor]
val actors = ActorRegistry.actorsFor[TestActor]
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetAllActorsFromActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val actors = ActorRegistry.actors
assert(actors.size === 2)
assert(actors.head.isInstanceOf[TestActor])
assert(actors.head.getId === "MyID")
assert(actors.last.isInstanceOf[TestActor])
assert(actors.last.getId === "MyID")
assert(actors.head.actor.isInstanceOf[TestActor])
assert(actors.head.actor.asInstanceOf[TestActor].getId === "MyID")
assert(actors.last.actor.isInstanceOf[TestActor])
assert(actors.last.actor.asInstanceOf[TestActor].getId === "MyID")
actor1.stop
actor2.stop
}
@Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
record = ""
ActorRegistry.foreach(actor => actor !! "ping")
@ -137,9 +138,9 @@ class ActorRegistrySpec extends JUnitSuite {
@Test def shouldShutdownAllActorsInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
ActorRegistry.shutdownAll
assert(ActorRegistry.actors.size === 0)
@ -147,9 +148,9 @@ class ActorRegistrySpec extends JUnitSuite {
@Test def shouldRemoveUnregisterActorInActorRegistry = {
ActorRegistry.shutdownAll
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
assert(ActorRegistry.actors.size === 2)
ActorRegistry.unregister(actor1)

View file

@ -3,6 +3,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.actor.Actor.transactor
import se.scalablesolutions.akka.stm.Transaction.Global.atomic
import se.scalablesolutions.akka.util.Logging
import Actor._
import org.scalatest.Suite
import org.scalatest.junit.JUnitRunner

View file

@ -8,6 +8,7 @@ import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
case class Send(actor: Actor)
@ -44,7 +45,7 @@ object SendOneWayAndReplySenderActor {
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: Actor = _
var sendTo: ActorID = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
@ -89,7 +90,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
@Test
def shouldSendOneWay = {
val actor = new RemoteActorSpecActorUnidirectional
val actor = newActor[RemoteActorSpecActorUnidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
actor ! "OneWay"
@ -99,24 +100,24 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
@Test
def shouldSendOneWayAndReceiveReply = {
val actor = new SendOneWayAndReplyReceiverActor
val actor = newActor[SendOneWayAndReplyReceiverActor]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val sender = new SendOneWayAndReplySenderActor
val sender = newActor[SendOneWayAndReplySenderActor]
sender.setReplyToAddress(HOSTNAME, PORT2)
sender.sendTo = actor
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor
sender.start
sender.sendOff
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff
assert(SendOneWayAndReplySenderActor.latch.await(1, TimeUnit.SECONDS))
assert(sender.state.isDefined === true)
assert("World" === sender.state.get.asInstanceOf[String])
assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true)
assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String])
actor.stop
sender.stop
}
@Test
def shouldSendBangBangMessageAndReceiveReply = {
val actor = new RemoteActorSpecActorBidirectional
val actor = newActor[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor !! "Hello"
@ -127,7 +128,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
@Test
def shouldSendAndReceiveRemoteException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
val actor = newActor[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {

View file

@ -4,6 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
import Actor.Sender.Self
@ -20,22 +21,26 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
}
@Test def shouldSendOneWay = {
val oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
def receive = {
case "OneWay" => oneWay.countDown
}
object OneWayTestActor {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
}
@Test def shouldSendOneWay = {
val actor = newActor[OneWayTestActor]
actor.start
val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS))
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
@ -43,7 +48,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldSendReplyAsync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
@ -51,7 +56,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldSendReceiveException = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
try {
actor !! "Failure"

View file

@ -5,13 +5,14 @@ import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.CountDownLatch
import Actor._
/**
* Tests the behaviour of the executor based event driven dispatcher when multiple actors are being dispatched on it.
*
* @author Jan Van Besien
*/
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers with ActorTestUtil {
class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers {
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "SlowActor"
@ -35,29 +36,26 @@ class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustM
}
}
@Test def slowActorShouldntBlockFastActor = verify(new TestActor {
def test = {
val sFinished = new CountDownLatch(50)
val fFinished = new CountDownLatch(10)
val s = new SlowActor(sFinished)
val f = new FastActor(fFinished)
handle(s, f) {
// send a lot of stuff to s
for (i <- 1 to 50) {
s ! i
}
// send some messages to f
for (i <- 1 to 10) {
f ! i
}
// now assert that f is finished while s is still busy
fFinished.await
assert(sFinished.getCount > 0)
}
@Test def slowActorShouldntBlockFastActor = {
val sFinished = new CountDownLatch(50)
val fFinished = new CountDownLatch(10)
val s = newActor(() => new SlowActor(sFinished)).start
val f = newActor(() => new FastActor(fFinished)).start
// send a lot of stuff to s
for (i <- 1 to 50) {
s ! i
}
})
// send some messages to f
for (i <- 1 to 10) {
f ! i
}
// now assert that f is finished while s is still busy
fFinished.await
assert(sFinished.getCount > 0)
f.stop
s.stop
}
}

View file

@ -6,13 +6,14 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
import java.util.concurrent.{TimeUnit, CountDownLatch}
/**
* @author Jan Van Besien
*/
class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers with ActorTestUtil {
class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers {
val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
@ -29,40 +30,38 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
}
}
@Test def fastActorShouldStealWorkFromSlowActor = verify(new TestActor {
def test = {
val finishedCounter = new CountDownLatch(110)
@Test def fastActorShouldStealWorkFromSlowActor = {
val finishedCounter = new CountDownLatch(110)
val slow = new DelayableActor("slow", 50, finishedCounter)
val fast = new DelayableActor("fast", 10, finishedCounter)
val slow = newActor(() => new DelayableActor("slow", 50, finishedCounter)).start
val fast = newActor(() => new DelayableActor("fast", 10, finishedCounter)).start
handle(slow, fast) {
for (i <- 1 to 100) {
// send most work to slow actor
if (i % 20 == 0)
fast ! i
else
slow ! i
}
// now send some messages to actors to keep the dispatcher dispatching messages
for (i <- 1 to 10) {
Thread.sleep(150)
if (i % 2 == 0)
fast ! i
else
slow ! i
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.invocationCount must be > (slow.invocationCount)
}
for (i <- 1 to 100) {
// send most work to slow actor
if (i % 20 == 0)
fast ! i
else
slow ! i
}
})
@Test def canNotUseActorsOfDifferentTypesInSameDispatcher:Unit = {
val first = new FirstActor
val second = new SecondActor
// now send some messages to actors to keep the dispatcher dispatching messages
for (i <- 1 to 10) {
Thread.sleep(150)
if (i % 2 == 0)
fast ! i
else
slow ! i
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.actor.asInstanceOf[DelayableActor].invocationCount)
slow.stop
fast.stop
}
@Test def canNotUseActorsOfDifferentTypesInSameDispatcher = {
val first = newActor[FirstActor]
val second = newActor[SecondActor]
first.start
intercept[IllegalStateException] {

View file

@ -4,11 +4,12 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
class ForwardActorSpec extends JUnitSuite {
object ForwardState {
var sender: Actor = null
var sender: ActorID = null
val finished = new CountDownLatch(1)
}
@ -24,7 +25,7 @@ class ForwardActorSpec extends JUnitSuite {
class ForwardActor extends Actor {
val receiverActor = new ReceiverActor
val receiverActor = newActor[ReceiverActor]
receiverActor.start
def receive = {
case "SendBang" => receiverActor.forward("SendBang")
@ -33,7 +34,7 @@ class ForwardActorSpec extends JUnitSuite {
}
class BangSenderActor extends Actor {
val forwardActor = new ForwardActor
val forwardActor = newActor[ForwardActor]
forwardActor.start
forwardActor ! "SendBang"
def receive = {
@ -42,7 +43,7 @@ class ForwardActorSpec extends JUnitSuite {
}
class BangBangSenderActor extends Actor {
val forwardActor = new ForwardActor
val forwardActor = newActor[ForwardActor]
forwardActor.start
(forwardActor !! "SendBangBang") match {
case Some(_) => {ForwardState.finished.countDown}
@ -55,7 +56,7 @@ class ForwardActorSpec extends JUnitSuite {
@Test
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
val senderActor = new BangSenderActor
val senderActor = newActor[BangSenderActor]
senderActor.start
assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
assert(ForwardState.sender ne null)
@ -64,7 +65,7 @@ class ForwardActorSpec extends JUnitSuite {
@Test
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
val senderActor = new BangBangSenderActor
val senderActor = newActor[BangBangSenderActor]
senderActor.start
assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
}

View file

@ -3,6 +3,7 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Futures
import Actor._
class FutureSpec extends JUnitSuite {
class TestActor extends Actor {
@ -16,7 +17,7 @@ class FutureSpec extends JUnitSuite {
}
@Test def shouldActorReplyResultThroughExplicitFuture = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val future = actor !!! "Hello"
future.await
@ -26,7 +27,7 @@ class FutureSpec extends JUnitSuite {
}
@Test def shouldActorReplyExceptionThroughExplicitFuture = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val future = actor !!! "Failure"
future.await
@ -37,9 +38,9 @@ class FutureSpec extends JUnitSuite {
/*
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
@ -51,9 +52,9 @@ class FutureSpec extends JUnitSuite {
}
@Test def shouldFutureAwaitEitherRight = {
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
@ -65,9 +66,9 @@ class FutureSpec extends JUnitSuite {
}
*/
@Test def shouldFutureAwaitOneLeft = {
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
@ -79,9 +80,9 @@ class FutureSpec extends JUnitSuite {
}
@Test def shouldFutureAwaitOneRight = {
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
@ -93,9 +94,9 @@ class FutureSpec extends JUnitSuite {
}
@Test def shouldFutureAwaitAll = {
val actor1 = new TestActor
val actor1 = newActor[TestActor]
actor1.start
val actor2 = new TestActor
val actor2 = newActor[TestActor]
actor2.start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "Hello"
@ -107,5 +108,4 @@ class FutureSpec extends JUnitSuite {
actor1.stop
actor2.stop
}
}

View file

@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector}
import Actor._
case class GetMapState(key: String)
case object GetVectorState
@ -15,15 +16,18 @@ case class SetMapState(key: String, value: String)
case class SetVectorState(key: String)
case class SetRefState(key: String)
case class Success(key: String, value: String)
case class Failure(key: String, value: String, failer: Actor)
case class Failure(key: String, value: String, failer: ActorID)
case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
case class FailureOneWay(key: String, value: String, failer: ActorID)
class InMemStatefulActor(expectedInvocationCount:Int) extends Actor {
case object GetNotifier
case class Notifier(latch: CountDownLatch)
class InMemStatefulActor(expectedInvocationCount: Int) extends Actor {
def this() = this(0)
timeout = 5000
makeTransactionRequired
@ -35,6 +39,8 @@ class InMemStatefulActor(expectedInvocationCount:Int) extends Actor {
private lazy val refState = TransactionalState.newRef[String]
def receive = {
case GetNotifier =>
reply(Notifier(notifier))
case GetMapState(key) =>
reply(mapState.get(key).get)
notifier.countDown
@ -105,17 +111,18 @@ class InMemFailerActor extends Actor {
class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@Test
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -124,22 +131,23 @@ class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@Test
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
@ -150,17 +158,18 @@ class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert(2 === (stateful !! GetVectorSize).get)
}
@Test
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetVectorState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -169,23 +178,24 @@ class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
Thread.sleep(1000)
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert(1 === (stateful !! GetVectorSize).get)
}
@Test
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetVectorState("init") // set init state
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
@ -196,17 +206,18 @@ class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("new state" === (stateful !! GetRefState).get)
}
@Test
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetRefState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
@ -215,23 +226,24 @@ class InMemoryActorSpec extends JUnitSuite {
@Test
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor(2)
val stateful = newActor(() => new InMemStatefulActor(2))
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
Thread.sleep(1000)
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
assert(stateful.notifier.await(1, TimeUnit.SECONDS))
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
}
@Test
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
val stateful = newActor[InMemStatefulActor]
stateful.start
stateful !! SetRefState("init") // set init state
val failer = new InMemFailerActor
val failer = newActor[InMemFailerActor]
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method

View file

@ -2,6 +2,7 @@ package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
class MemoryFootprintSpec extends JUnitSuite {
class Mem extends Actor {

View file

@ -1,307 +0,0 @@
package se.scalablesolutions.akka
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import net.lag.logging.Logger
/**
* The Computer Language Benchmarks Game
* <p/>
* URL: [http://shootout.alioth.debian.org/]
* <p/>
* Contributed by Julien Gaugaz.
* <p/>
* Inspired by the version contributed by Yura Taras and modified by Isaac Gouy.
*/
class PerformanceSpec extends JUnitSuite {
@Test
def dummyTest = assert(true)
// @Test
def benchAkkaActorsVsScalaActors = {
def stressTestAkkaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
import se.scalablesolutions.akka.actor.Actor
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
case object BLUE extends Colour
case object FADED extends Colour
val colours = Array[Colour](BLUE, RED, YELLOW)
case class Meet(from: Actor, colour: Colour)
case class Change(colour: Colour)
case class MeetingCount(count: Int)
case class ExitActor(actor: Actor, reason: String)
var totalTime = 0L
class Mall(var nrMeets: Int, numChameneos: Int) extends Actor {
var waitingChameneo: Option[Actor] = None
var sumMeetings = 0
var numFaded = 0
var startTime: Long = 0L
start
def startChameneos(): Unit = {
startTime = System.currentTimeMillis
var i = 0
while (i < numChameneos) {
Chameneo(this, colours(i % 3), i).start
i = i + 1
}
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
def receive = {
case MeetingCount(i) => {
numFaded = numFaded + 1
sumMeetings = sumMeetings + i
if (numFaded == numChameneos) {
totalTime = System.currentTimeMillis - startTime
println("time: " + totalTime)
exit
}
}
case msg@Meet(a, c) => {
if (nrMeets > 0) {
waitingChameneo match {
case Some(chameneo) =>
nrMeets = nrMeets - 1
chameneo ! msg
waitingChameneo = None
case None =>
waitingChameneo = sender
}
} else {
waitingChameneo match {
case Some(chameneo) =>
chameneo ! ExitActor(this, "normal")
case None =>
}
sender.get ! ExitActor(this, "normal")
}
}
}
}
case class Chameneo(var mall: Mall, var colour: Colour, cid: Int) extends Actor {
var meetings = 0
override def start = {
val r = super.start
mall ! Meet(this, colour)
r
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
override def receive: PartialFunction[Any, Unit] = {
case Meet(from, otherColour) =>
colour = complement(otherColour)
meetings = meetings + 1
from ! Change(colour)
mall ! Meet(this, colour)
case Change(newColour) =>
colour = newColour
meetings = meetings + 1
mall ! Meet(this, colour)
case ExitActor(_, _) =>
colour = FADED
sender.get ! MeetingCount(meetings)
exit
}
def complement(otherColour: Colour): Colour = {
colour match {
case RED => otherColour match {
case RED => RED
case YELLOW => BLUE
case BLUE => YELLOW
case FADED => FADED
}
case YELLOW => otherColour match {
case RED => BLUE
case YELLOW => YELLOW
case BLUE => RED
case FADED => FADED
}
case BLUE => otherColour match {
case RED => YELLOW
case YELLOW => RED
case BLUE => BLUE
case FADED => FADED
}
case FADED => FADED
}
}
override def toString() = cid + "(" + colour + ")"
}
val mall = new Mall(nrOfMessages, nrOfActors)
mall.startChameneos
Thread.sleep(sleepTime)
totalTime
}
def stressTestScalaActors(nrOfMessages: Int, nrOfActors: Int, sleepTime: Int): Long = {
var totalTime = 0L
import scala.actors._
import scala.actors.Actor._
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
case object BLUE extends Colour
case object FADED extends Colour
val colours = Array[Colour](BLUE, RED, YELLOW)
case class Meet(colour: Colour)
case class Change(colour: Colour)
case class MeetingCount(count: Int)
class Mall(var n: Int, numChameneos: Int) extends Actor {
var waitingChameneo: Option[OutputChannel[Any]] = None
var startTime: Long = 0L
start()
def startChameneos(): Unit = {
startTime = System.currentTimeMillis
var i = 0
while (i < numChameneos) {
Chameneo(this, colours(i % 3), i).start()
i = i + 1
}
}
def act() {
var sumMeetings = 0
var numFaded = 0
loop {
react {
case MeetingCount(i) => {
numFaded = numFaded + 1
sumMeetings = sumMeetings + i
if (numFaded == numChameneos) {
totalTime = System.currentTimeMillis - startTime
exit()
}
}
case msg@Meet(c) => {
if (n > 0) {
waitingChameneo match {
case Some(chameneo) =>
n = n - 1
chameneo.forward(msg)
waitingChameneo = None
case None =>
waitingChameneo = Some(sender)
}
} else {
waitingChameneo match {
case Some(chameneo) =>
chameneo ! Exit(this, "normal")
case None =>
}
sender ! Exit(this, "normal")
}
}
}
}
}
}
case class Chameneo(var mall: Mall, var colour: Colour, id: Int) extends Actor {
var meetings = 0
def act() {
loop {
mall ! Meet(colour)
react {
case Meet(otherColour) =>
colour = complement(otherColour)
meetings = meetings + 1
sender ! Change(colour)
case Change(newColour) =>
colour = newColour
meetings = meetings + 1
case Exit(_, _) =>
colour = FADED
sender ! MeetingCount(meetings)
exit()
}
}
}
def complement(otherColour: Colour): Colour = {
colour match {
case RED => otherColour match {
case RED => RED
case YELLOW => BLUE
case BLUE => YELLOW
case FADED => FADED
}
case YELLOW => otherColour match {
case RED => BLUE
case YELLOW => YELLOW
case BLUE => RED
case FADED => FADED
}
case BLUE => otherColour match {
case RED => YELLOW
case YELLOW => RED
case BLUE => BLUE
case FADED => FADED
}
case FADED => FADED
}
}
override def toString() = id + "(" + colour + ")"
}
val mall = new Mall(nrOfMessages, nrOfActors)
mall.startChameneos
Thread.sleep(sleepTime)
totalTime
}
Logger.INFO
println("===========================================")
println("== Benchmark Akka Actors vs Scala Actors ==")
var nrOfMessages = 2000000
var nrOfActors = 4
var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 30)
var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
println("\tNr of messages:\t" + nrOfMessages)
println("\tNr of actors:\t" + nrOfActors)
println("\tAkka Actors:\t" + akkaTime + "\t milliseconds")
println("\tScala Actors:\t" + scalaTime + "\t milliseconds")
println("\tAkka is " + ratio + " times faster\n")
println("===========================================")
assert(true)
}
}

View file

@ -3,11 +3,11 @@ package se.scalablesolutions.akka.actor
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import se.scalablesolutions.akka.dispatch.Dispatchers
class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite {
import Actor.Sender.Self
private val unit = TimeUnit.MILLISECONDS
@ -22,22 +22,26 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
}
}
@Test def shouldSendOneWay = {
val oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid)
def receive = {
case "OneWay" => oneWay.countDown
}
object OneWayTestActor {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
}
@Test def shouldSendOneWay = {
val actor = newActor[OneWayTestActor]
actor.start
val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS))
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
@ -45,7 +49,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
}
@Test def shouldSendReplyAsync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
@ -53,7 +57,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
}
@Test def shouldSendReceiveException = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
try {
actor !! "Failure"

View file

@ -3,7 +3,9 @@ package se.scalablesolutions.akka.actor
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
import Actor.Sender.Self
@ -22,12 +24,12 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
@Test def shouldSendOneWay = {
val oneWay = new CountDownLatch(1)
val actor = new Actor {
val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
def receive = {
case "OneWay" => oneWay.countDown
}
}
})
actor.start
val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS))
@ -35,7 +37,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldSendReplySync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
@ -43,7 +45,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldSendReplyAsync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
@ -51,7 +53,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
}
@Test def shouldSendReceiveException = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
try {
actor !! "Failure"

View file

@ -1,31 +0,0 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.actor._
import Actor.Sender.Self
import org.scalatest.junit.JUnitSuite
import org.junit.Test
/*
class RemoteClientShutdownTest extends JUnitSuite {
@Test def shouldShutdownRemoteClient = {
RemoteNode.start("localhost", 9999)
var remote = new TravelingActor
remote.start
remote ! "sending a remote message"
remote.stop
Thread.sleep(1000)
RemoteNode.shutdown
println("======= REMOTE CLIENT SHUT DOWN FINE =======")
assert(true)
}
}
class TravelingActor extends RemoteActor("localhost", 9999) {
def receive = {
case _ => log.info("message received")
}
}
*/

View file

@ -4,12 +4,13 @@
package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import se.scalablesolutions.akka.serialization.BinaryString
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer}
import se.scalablesolutions.akka.OneWay
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@ -82,9 +83,9 @@ class RemoteSupervisorSpec extends JUnitSuite {
}).start
Thread.sleep(1000)
var pingpong1: RemotePingPong1Actor = _
var pingpong2: RemotePingPong2Actor = _
var pingpong3: RemotePingPong3Actor = _
var pingpong1: ActorID = _
var pingpong2: ActorID = _
var pingpong3: ActorID = _
@Test def shouldStartServer = {
Log.messageLog.clear
@ -334,7 +335,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
// Then create a concrete container in which we mix in support for the specific
// implementation of the Actors we want to use.
pingpong1 = new RemotePingPong1Actor
pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
@ -349,7 +350,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
@ -363,11 +364,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
pingpong2 = newActor[RemotePingPong2Actor]
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
pingpong3 = newActor[RemotePingPong3Actor]
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
@ -389,11 +390,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
pingpong2 = newActor[RemotePingPong2Actor]
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
pingpong3 = newActor[RemotePingPong3Actor]
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(
@ -415,11 +416,11 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1 = newActor[RemotePingPong1Actor]
pingpong1.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong2 = new RemotePingPong2Actor
pingpong2 = newActor[RemotePingPong2Actor]
pingpong2.makeRemote(RemoteServer.HOSTNAME, 9988)
pingpong3 = new RemotePingPong3Actor
pingpong3 = newActor[RemotePingPong3Actor]
pingpong3.makeRemote(RemoteServer.HOSTNAME, 9988)
val factory = SupervisorFactory(

View file

@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
class SchedulerSpec extends JUnitSuite {

View file

@ -4,6 +4,8 @@ import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteNode}
import se.scalablesolutions.akka.util.Logging
import Actor._
class HelloWorldActor extends Actor {
start
def receive = {
@ -15,7 +17,7 @@ object ServerInitiatedRemoteActorServer {
def run = {
RemoteNode.start("localhost", 9999)
RemoteNode.register("hello-service", new HelloWorldActor)
RemoteNode.register("hello-service", newActor[HelloWorldActor])
}
def main(args: Array[String]) = run

View file

@ -4,6 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import Actor._
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
@ -12,7 +13,7 @@ object ServerInitiatedRemoteActorSpec {
val PORT = 9990
var server: RemoteServer = null
case class Send(actor: Actor)
case class Send(actor: ActorID)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
@ -42,23 +43,21 @@ object ServerInitiatedRemoteActorSpec {
class RemoteActorSpecActorAsyncSender extends Actor {
start
def receive = {
case Send(actor: Actor) =>
case Send(actor: ActorID) =>
actor ! "Hello"
case "World" =>
RemoteActorSpecActorAsyncSender.latch.countDown
}
def send(actor: Actor) {
this ! Send(actor)
def send(actor: ActorID) {
self ! Send(actor)
}
}
}
class ServerInitiatedRemoteActorSpec extends JUnitSuite {
import ServerInitiatedRemoteActorSpec._
import Actor.Sender.Self
se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.Config.config
private val unit = TimeUnit.MILLISECONDS
@ -68,9 +67,9 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
server.start(HOSTNAME, PORT)
server.register(new RemoteActorSpecActorUnidirectional)
server.register(new RemoteActorSpecActorBidirectional)
server.register(new RemoteActorSpecActorAsyncSender)
server.register(newActor[RemoteActorSpecActorUnidirectional])
server.register(newActor[RemoteActorSpecActorBidirectional])
server.register(newActor[RemoteActorSpecActorAsyncSender])
Thread.sleep(1000)
}

View file

@ -2,6 +2,8 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.actor.Actor
import Actor._
object ActorShutdownRunner {
def main(args: Array[String]) {
class MyActor extends Actor {
@ -11,7 +13,7 @@ object ActorShutdownRunner {
}
}
val myActor = new MyActor
val myActor = newActor[MyActor]
myActor.start
myActor ! "test"
myActor.stop

View file

@ -2,6 +2,8 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm._
import Actor._
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers

View file

@ -4,10 +4,12 @@
package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue}
import java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.{OneWay, Die, Ping}
import Actor._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@ -20,9 +22,9 @@ class SupervisorSpec extends JUnitSuite {
var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
var pingpong1: PingPong1Actor = _
var pingpong2: PingPong2Actor = _
var pingpong3: PingPong3Actor = _
var pingpong1: ActorID = _
var pingpong2: ActorID = _
var pingpong3: ActorID = _
@Test def shouldStartServer = {
messageLog.clear
@ -370,7 +372,7 @@ class SupervisorSpec extends JUnitSuite {
// Creat some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = {
pingpong1 = new PingPong1Actor
pingpong1 = newActor[PingPong1Actor]
val factory = SupervisorFactory(
SupervisorConfig(
@ -383,7 +385,7 @@ class SupervisorSpec extends JUnitSuite {
}
def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = new PingPong1Actor
pingpong1 = newActor[PingPong1Actor]
val factory = SupervisorFactory(
SupervisorConfig(
@ -396,9 +398,9 @@ class SupervisorSpec extends JUnitSuite {
}
def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = new PingPong1Actor
pingpong2 = new PingPong2Actor
pingpong3 = new PingPong3Actor
pingpong1 = newActor[PingPong1Actor]
pingpong2 = newActor[PingPong2Actor]
pingpong3 = newActor[PingPong3Actor]
val factory = SupervisorFactory(
SupervisorConfig(
@ -419,9 +421,9 @@ class SupervisorSpec extends JUnitSuite {
}
def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = new PingPong1Actor
pingpong2 = new PingPong2Actor
pingpong3 = new PingPong3Actor
pingpong1 = newActor[PingPong1Actor]
pingpong2 = newActor[PingPong2Actor]
pingpong3 = newActor[PingPong3Actor]
val factory = SupervisorFactory(
SupervisorConfig(
@ -442,9 +444,9 @@ class SupervisorSpec extends JUnitSuite {
}
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = new PingPong1Actor
pingpong2 = new PingPong2Actor
pingpong3 = new PingPong3Actor
pingpong1 = newActor[PingPong1Actor]
pingpong2 = newActor[PingPong2Actor]
pingpong3 = newActor[PingPong3Actor]
val factory = SupervisorFactory(
SupervisorConfig(

View file

@ -1,26 +0,0 @@
package se.scalablesolutions.akka.actor
/**
* Actor which can be used as the basis for unit testing actors. I automatically start and stops all involved handlers before and after
* the test.
*/
abstract class TestActor extends Actor with ActorTestUtil {
def test: Unit
def receive = {case _ =>}
}
trait ActorTestUtil {
def handle[T](actors: Actor*)(test: => T): T = {
for (a <- actors) a.start
try {
test
}
finally {
for (a <- actors) a.stop
}
}
def verify(actor: TestActor): Unit = handle(actor) {
actor.test
}
}

View file

@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import Actor._
class ThreadBasedActorSpec extends JUnitSuite {
@ -23,12 +24,12 @@ class ThreadBasedActorSpec extends JUnitSuite {
@Test def shouldSendOneWay = {
var oneWay = new CountDownLatch(1)
val actor = new Actor {
val actor = newActor(() => new Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "OneWay" => oneWay.countDown
}
}
})
actor.start
val result = actor ! "OneWay"
assert(oneWay.await(1, TimeUnit.SECONDS))
@ -36,7 +37,7 @@ class ThreadBasedActorSpec extends JUnitSuite {
}
@Test def shouldSendReplySync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result: String = (actor !! ("Hello", 10000)).get
assert("World" === result)
@ -44,7 +45,7 @@ class ThreadBasedActorSpec extends JUnitSuite {
}
@Test def shouldSendReplyAsync = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
@ -52,7 +53,7 @@ class ThreadBasedActorSpec extends JUnitSuite {
}
@Test def shouldSendReceiveException = {
val actor = new TestActor
val actor = newActor[TestActor]
actor.start
try {
actor !! "Failure"

View file

@ -10,12 +10,13 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import se.scalablesolutions.akka.actor.Actor
import Actor._
class ThreadBasedDispatcherSpec extends JUnitSuite {
private var threadingIssueDetected: AtomicBoolean = null
val key1 = new Actor { def receive = { case _ => {}} }
val key2 = new Actor { def receive = { case _ => {}} }
val key3 = new Actor { def receive = { case _ => {}} }
val key1 = newActor(() => new Actor { def receive = { case _ => {}} })
val key2 = newActor(() => new Actor { def receive = { case _ => {}} })
val key3 = newActor(() => new Actor { def receive = { case _ => {}} })
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock

View file

@ -5,6 +5,8 @@ import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.Actor._
@RunWith(classOf[JUnitRunner])
class TransactionalRefSpec extends Spec with ShouldMatchers {
@ -37,7 +39,7 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
ref alter (_ + 1)
}
increment
@ -53,7 +55,7 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref = Ref[Int]
def increment = atomic {
ref alter (_ + 1)
ref alter (_ + 1)
}
evaluating { increment } should produce [RuntimeException]
@ -63,7 +65,7 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref1 = Ref(1)
val ref2 = atomic {
ref1 map (_ + 1)
ref1 map (_ + 1)
}
val value1 = atomic { ref1.get.get }
@ -91,7 +93,7 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref1 = Ref(1)
val ref2 = atomic {
for (value <- ref1) yield value + 2
for (value <- ref1) yield value + 2
}
val value2 = atomic { ref2.get.get }
@ -104,10 +106,10 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref2 = Ref(2)
val ref3 = atomic {
for {
value1 <- ref1
value2 <- ref2
} yield value1 + value2
for {
value1 <- ref1
value2 <- ref2
} yield value1 + value2
}
val value3 = atomic { ref3.get.get }
@ -119,13 +121,13 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref1 = Ref(1)
val refLess2 = atomic {
for (value <- ref1 if value < 2) yield value
for (value <- ref1 if value < 2) yield value
}
val optLess2 = atomic { refLess2.get }
val refGreater2 = atomic {
for (value <- ref1 if value > 2) yield value
for (value <- ref1 if value > 2) yield value
}
val optGreater2 = atomic { refGreater2.get }

View file

@ -22,7 +22,7 @@
package se.scalablesolutions.akka.security
import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorRegistry}
import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorID, ActorRegistry}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config
@ -73,7 +73,7 @@ case class SpnegoCredentials(token: Array[Byte]) extends Credentials
* Jersey Filter for invocation intercept and authorization/authentication
*/
class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
class Filter(actor: Actor, rolesAllowed: Option[List[String]])
class Filter(actor: ActorID, rolesAllowed: Option[List[String]])
extends ResourceFilter with ContainerRequestFilter with Logging {
override def getRequestFilter: ContainerRequestFilter = this
@ -111,7 +111,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
* Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin
* strategy could be implemented in the future
*/
def authenticator: Actor = ActorRegistry.actorsFor(authenticatorFQN).head
def authenticator: ActorID = ActorRegistry.actorsFor(authenticatorFQN).head
def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] =
java.util.Collections.singletonList(new Filter(authenticator, roles))