ActorID: now all test pass, mission accomplished, ready for master

This commit is contained in:
Jonas Bonér 2010-05-03 21:00:54 +02:00
parent 7b1e43c89e
commit 4f66e90aa0
10 changed files with 42 additions and 35 deletions

View file

@ -180,9 +180,9 @@ class ProducerResponseSender(
def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers))
private def reply(message: Any) = replyTo match { private def reply(message: Any) = replyTo match {
case Some(Left(actor)) => actor ! message case Some(Left(actor)) => actor ! message
case Some(Right(future)) => future.completeWithResult(message) case Some(Right(future)) => future.completeWithResult(message)
case _ => log.warning("No destination for sending response") case _ => log.warning("No destination for sending response")
} }
} }

View file

@ -85,8 +85,8 @@ class PublishRequestor(consumerPublisher: ActorID) extends Actor {
protected def receive = { protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ } case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match { case ActorRegistered(actor) => Publish.forConsumer(actor) match {
case Some(publish) => consumerPublisher ! publish case Some(publish) => consumerPublisher ! publish
case None => { /* ignore */ } case None => { /* ignore */ }
} }
} }
} }
@ -121,15 +121,15 @@ object Publish {
def forConsumer(actor: ActorID): Option[Publish] = def forConsumer(actor: ActorID): Option[Publish] =
forConsumeAnnotated(actor) orElse forConsumerType(actor) forConsumeAnnotated(actor) orElse forConsumerType(actor)
private def forConsumeAnnotated(actor: ActorID): Option[Publish] = { private def forConsumeAnnotated(actorId: ActorID): Option[Publish] = {
val annotation = actor.getClass.getAnnotation(classOf[consume]) val annotation = actorId.actorInstanceClass.getAnnotation(classOf[consume])
if (annotation eq null) None if (annotation eq null) None
else if (actor.remoteAddress.isDefined) None // do not publish proxies else if (actorId.remoteAddress.isDefined) None // do not publish proxies
else Some(Publish(annotation.value, actor.getId, false)) else Some(Publish(annotation.value, actorId.id, false))
} }
private def forConsumerType(actor: ActorID): Option[Publish] = private def forConsumerType(actorId: ActorID): Option[Publish] =
if (!actor.isInstanceOf[Consumer]) None if (!actorId.actor.isInstanceOf[Consumer]) None
else if (actor.remoteAddress.isDefined) None else if (actorId.remoteAddress.isDefined) None
else Some(Publish(actor.asInstanceOf[Consumer].endpointUri, actor.uuid, true)) else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true))
} }

View file

@ -31,7 +31,7 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
mockEndpoint.reset mockEndpoint.reset
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
} }
/*
feature("Produce a message to a Camel endpoint") { feature("Produce a message to a Camel endpoint") {
scenario("produce message sync and receive response") { scenario("produce message sync and receive response") {
@ -121,7 +121,6 @@ class ProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with Before
} }
} }
*/
private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint]) private def mockEndpoint = CamelContextManager.context.getEndpoint("mock:mock", classOf[MockEndpoint])
class TestRoute extends RouteBuilder { class TestRoute extends RouteBuilder {

View file

@ -35,7 +35,6 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
var service: CamelService = CamelService.newInstance var service: CamelService = CamelService.newInstance
/*
override protected def beforeAll = { override protected def beforeAll = {
ActorRegistry.shutdownAll ActorRegistry.shutdownAll
// register test consumer before starting the CamelService // register test consumer before starting the CamelService
@ -89,5 +88,4 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi
assert(response === "received msg3") assert(response === "received msg3")
} }
} }
*/
} }

View file

@ -17,7 +17,7 @@ object PublishRequestorTest {
def onMessage(msg: Publish) = received = msg def onMessage(msg: Publish) = received = msg
} }
} }
/*
class PublishRequestorTest extends JUnitSuite { class PublishRequestorTest extends JUnitSuite {
import PublishRequestorTest._ import PublishRequestorTest._
@ -27,7 +27,7 @@ class PublishRequestorTest extends JUnitSuite {
val consumer = newActor(() => new Actor with Consumer { val consumer = newActor(() => new Actor with Consumer {
def endpointUri = "mock:test" def endpointUri = "mock:test"
protected def receive = null protected def receive = null
}) }).start
val publisher = newActor(() => new PublisherMock with Countdown[Publish]) val publisher = newActor(() => new PublisherMock with Countdown[Publish])
val requestor = newActor(() => new PublishRequestor(publisher)) val requestor = newActor(() => new PublishRequestor(publisher))
publisher.start publisher.start
@ -39,4 +39,3 @@ class PublishRequestorTest extends JUnitSuite {
requestor.stop requestor.stop
} }
} }
*/

View file

@ -24,7 +24,7 @@ object PublishTest {
protected def receive = null protected def receive = null
} }
} }
/*
class PublishTest extends JUnitSuite { class PublishTest extends JUnitSuite {
import PublishTest._ import PublishTest._
@ -34,14 +34,13 @@ class PublishTest extends JUnitSuite {
} }
@Test def shouldCreateSomePublishRequestWithActorId = { @Test def shouldCreateSomePublishRequestWithActorId = {
val publish = Publish.forConsumers(List(newActor[ConsumeAnnotatedActor])) val publish = Publish.forConsumer(newActor[ConsumeAnnotatedActor])
assert(publish === Some(Publish("mock:test1", "test", false))) assert(publish === Some(Publish("mock:test1", "test", false)))
} }
@Test def shouldCreateSomePublishRequestWithActorUuid = { @Test def shouldCreateSomePublishRequestWithActorUuid = {
val ca = newActor[ConsumerActor] val ca = newActor[ConsumerActor]
val publish = Publish.forConsumers(List(ca)) val publish = Publish.forConsumer(ca)
assert(publish === Some(Publish("mock:test2", ca.uuid, true)))
assert(publish === Some(Publish("mock:test2", ca.uuid, true))) assert(publish === Some(Publish("mock:test2", ca.uuid, true)))
} }
@ -50,4 +49,3 @@ class PublishTest extends JUnitSuite {
assert(publish === None) assert(publish === None)
} }
} }
*/

View file

@ -260,12 +260,13 @@ final class ActorID private[akka] () {
this() this()
newActorFactory = Left(Some(clazz)) newActorFactory = Left(Some(clazz))
} }
private[akka] def this(factory: () => Actor) = { private[akka] def this(factory: () => Actor) = {
this() this()
newActorFactory = Right(Some(factory)) newActorFactory = Right(Some(factory))
} }
lazy val actor: Actor = { private[akka] lazy val actor: Actor = {
val actor = newActorFactory match { val actor = newActorFactory match {
case Left(Some(clazz)) => case Left(Some(clazz)) =>
try { try {
@ -284,6 +285,12 @@ final class ActorID private[akka] () {
if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'") if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'")
actor actor
} }
/**
* Returns the class for the Actor instance that is managed by the ActorID.
*/
def actorInstanceClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
/** /**
* Starts up the actor and its message queue. * Starts up the actor and its message queue.
*/ */
@ -296,12 +303,12 @@ final class ActorID private[akka] () {
* Shuts down the actor its dispatcher and message queue. * Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'. * Alias for 'stop'.
*/ */
protected def exit = actor.stop protected def exit: Unit = actor.stop
/** /**
* Shuts down the actor its dispatcher and message queue. * Shuts down the actor its dispatcher and message queue.
*/ */
def stop = actor.stop def stop: Unit = actor.stop
/** /**
* Is the actor running? * Is the actor running?
@ -456,7 +463,7 @@ final class ActorID private[akka] () {
/** /**
* Returns the id for the actor. * Returns the id for the actor.
*/ */
def getId = actor.getId def id = actor.getId
/** /**
* Returns the uuid for the actor. * Returns the uuid for the actor.
@ -1026,7 +1033,7 @@ trait Actor extends TransactionManagement with Logging {
val server = new RemoteServer val server = new RemoteServer
server.start(host, port) server.start(host, port)
} }
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get) RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get)
} }
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)

View file

@ -97,7 +97,7 @@ object ActorRegistry extends Logging {
actorsByUUID.put(actorId.uuid, actorId) actorsByUUID.put(actorId.uuid, actorId)
// ID // ID
val id = actorId.getId val id = actorId.id
if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId)
if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id))
else actorsById.put(id, actorId :: Nil) else actorsById.put(id, actorId :: Nil)
@ -117,7 +117,7 @@ object ActorRegistry extends Logging {
*/ */
def unregister(actor: ActorID) = { def unregister(actor: ActorID) = {
actorsByUUID remove actor.uuid actorsByUUID remove actor.uuid
actorsById remove actor.getId actorsById remove actor.id
actorsByClassName remove actor.getClass.getName actorsByClassName remove actor.getClass.getName
// notify listeners // notify listeners
foreachListener(_ ! ActorUnregistered(actor)) foreachListener(_ ! ActorUnregistered(actor))
@ -139,6 +139,7 @@ object ActorRegistry extends Logging {
* Adds the registration <code>listener</code> this this registry's listener list. * Adds the registration <code>listener</code> this this registry's listener list.
*/ */
def addRegistrationListener(listener: ActorID) = { def addRegistrationListener(listener: ActorID) = {
listener.start
registrationListeners.add(listener) registrationListeners.add(listener)
} }
@ -146,11 +147,16 @@ object ActorRegistry extends Logging {
* Removes the registration <code>listener</code> this this registry's listener list. * Removes the registration <code>listener</code> this this registry's listener list.
*/ */
def removeRegistrationListener(listener: ActorID) = { def removeRegistrationListener(listener: ActorID) = {
listener.stop
registrationListeners.remove(listener) registrationListeners.remove(listener)
} }
private def foreachListener(f: (ActorID) => Unit) { private def foreachListener(f: (ActorID) => Unit) {
val iterator = registrationListeners.iterator val iterator = registrationListeners.iterator
while (iterator.hasNext) f(iterator.next) while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) f(listener)
else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener)
}
} }
} }

View file

@ -131,7 +131,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
startLink(actorId) startLink(actorId)
remoteAddress.foreach(address => RemoteServer.actorsFor( remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port)) RemoteServer.Address(address.hostname, address.port))
.actors.put(actorId.getId, actorId)) .actors.put(actorId.id, actorId))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = { val supervisor = {

View file

@ -200,8 +200,8 @@ class RemoteServer extends Logging {
*/ */
def register(actor: ActorID) = synchronized { def register(actor: ActorID) = synchronized {
if (_isRunning) { if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId) log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
} }
} }