Added interface for registering session actors, and adding unit test (which is failing now)

This commit is contained in:
Paul Pacheco 2010-11-14 13:10:13 -06:00
parent 5dff29642d
commit 67cf3788f3
3 changed files with 82 additions and 0 deletions

View file

@ -301,12 +301,14 @@ object ActorRegistry extends ListenerManagement {
private[akka] def actors(address: Address) = actorsFor(address).actors
private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid
private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories
private[akka] def typedActors(address: Address) = actorsFor(address).typedActors
private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid
private[akka] class RemoteActorSet {
private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
}

View file

@ -302,6 +302,17 @@ class RemoteServer extends Logging with ListenerManagement {
else register(id, actorRef, actors)
}
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized {
log.debug("Registering server side remote session actor with id [%s]", id)
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), factory, actorsByUuid)
else registerPerSession(id, () => factory, actorsFactories)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning && !registry.contains(id)) {
if (!actorRef.isRunning) actorRef.start
@ -309,6 +320,12 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
if (_isRunning && !registry.contains(id)) {
registry.put(id, factory)
}
}
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
if (_isRunning && !registry.contains(id)) registry.put(id, typedActor)
}
@ -341,6 +358,18 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote session actor with id [%s]", id)
actorsFactories.remove(id)
}
}
/**
* Unregister Remote Typed Actor by specific 'id'.
* <p/>
@ -358,8 +387,10 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors = ActorRegistry.actors(address)
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
private[akka] def actorsFactories = ActorRegistry.actorsFactories(address)
private[akka] def typedActors = ActorRegistry.typedActors(address)
private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address)
}
@ -429,6 +460,8 @@ class RemoteServerHandler(
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
val CHANNEL_INIT = "channel-init".intern
val sessionActors = new ChannelLocal();
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
/**

View file

@ -36,6 +36,22 @@ object ServerInitiatedRemoteActorSpec {
}
}
case class Login(user:String);
case class GetUser();
class RemoteStatefullSessionActorSpec extends Actor {
var user : String= _;
def receive = {
case Login(user) =>
this.user = user;
case GetUser() =>
self.reply(this.user)
}
}
object RemoteActorSpecActorAsyncSender {
val latch = new CountDownLatch(1)
}
@ -63,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
server.register(actorOf[RemoteActorSpecActorUnidirectional])
server.register(actorOf[RemoteActorSpecActorBidirectional])
server.register(actorOf[RemoteActorSpecActorAsyncSender])
server.registerPerSession("statefull-session-actor", actorOf[RemoteStatefullSessionActorSpec])
Thread.sleep(1000)
}
@ -103,6 +120,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop
}
@Test
def shouldKeepSessionInformation {
val session1 = RemoteClient.actorFor(
"statefull-session-actor",
5000L,
HOSTNAME, PORT)
val session2 = RemoteClient.actorFor(
"statefull-session-actor",
5000L,
HOSTNAME, PORT)
session1 ! Login("session1");
session2 ! Login("session2");
val result1 = session1 !! GetUser();
assert("session1" === result1.get.asInstanceOf[String])
val result2 = session2 !! GetUser();
assert("session2" === result2.get.asInstanceOf[String])
session1.stop()
session2.stop()
}
@Test
def shouldSendWithBangAndGetReplyThroughSenderRef {
implicit val timeout = 500000000L
@ -205,6 +245,13 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
server.unregister("my-service-1")
assert(server.actors.get("my-service-1") eq null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterSession {
server.registerPerSession("my-service-1", actorOf[RemoteActorSpecActorUnidirectional])
assert(server.actorsFactories.get("my-service-1") ne null, "actor registered")
server.unregisterPerSession("my-service-1")
assert(server.actorsFactories.get("my-service-1") eq null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {