Fixing a lot of stuff and starting to port unit tests

This commit is contained in:
Viktor Klang 2010-12-17 16:09:21 +01:00
parent 5f651c73ba
commit 8becbad787
8 changed files with 414 additions and 448 deletions

View file

@ -80,8 +80,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
protected[akka] val guard = new ReentrantGuard protected[akka] val guard = new ReentrantGuard
private[akka] def registry: ActorRegistryInstance
/** /**
* User overridable callback/setting. * User overridable callback/setting.
* <p/> * <p/>
@ -543,7 +541,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class LocalActorRef private[akka] ( class LocalActorRef private[akka] (
private[akka] val registry: ActorRegistryInstance,
private[this] val actorFactory: () => Actor) private[this] val actorFactory: () => Actor)
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {
@ -567,7 +564,6 @@ class LocalActorRef private[akka] (
// used only for deserialization // used only for deserialization
private[akka] def this( private[akka] def this(
__registry: ActorRegistryInstance,
__uuid: Uuid, __uuid: Uuid,
__id: String, __id: String,
__hostname: String, __hostname: String,
@ -578,7 +574,7 @@ class LocalActorRef private[akka] (
__supervisor: Option[ActorRef], __supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]], __hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () => Actor) = { __factory: () => Actor) = {
this(__registry, __factory) this(__factory)
_uuid = __uuid _uuid = __uuid
id = __id id = __id
timeout = __timeout timeout = __timeout
@ -588,7 +584,7 @@ class LocalActorRef private[akka] (
hotswap = __hotswap hotswap = __hotswap
setActorSelfFields(actor,this) setActorSelfFields(actor,this)
start start
__registry.register(this) ActorRegistry.register(this) //TODO: REVISIT: Is this needed?
} }
// ========= PUBLIC FUNCTIONS ========= // ========= PUBLIC FUNCTIONS =========
@ -649,7 +645,7 @@ class LocalActorRef private[akka] (
dispatcher.detach(this) dispatcher.detach(this)
_status = ActorRefInternals.SHUTDOWN _status = ActorRefInternals.SHUTDOWN
actor.postStop actor.postStop
registry.unregister(this) ActorRegistry.unregister(this)
setActorSelfFields(actorInstance.get,null) setActorSelfFields(actorInstance.get,null)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
} }
@ -1058,7 +1054,7 @@ class LocalActorRef private[akka] (
private def initializeActorInstance = { private def initializeActorInstance = {
actor.preStart // run actor preStart actor.preStart // run actor preStart
Actor.log.slf4j.trace("[{}] has started", toString) Actor.log.slf4j.trace("[{}] has started", toString)
registry.register(this) ActorRegistry.register(this)
} }
} }
@ -1078,13 +1074,11 @@ object RemoteActorSystemMessage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
private[akka] case class RemoteActorRef private[akka] ( private[akka] case class RemoteActorRef private[akka] (
registry: ActorRegistryInstance, classOrServiceName: Option[String],
classOrServiceName: String,
val actorClassName: String, val actorClassName: String,
val hostname: String, val hostname: String,
val port: Int, val port: Int,
_timeout: Long, _timeout: Long,
clientManaged: Boolean, //TODO: REVISIT: ENCODE CLIENT_MANAGED INTO REMOTE PROTOCOL
loader: Option[ClassLoader], loader: Option[ClassLoader],
val actorType: ActorType = ActorType.ScalaActor) val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {
@ -1093,40 +1087,40 @@ private[akka] case class RemoteActorRef private[akka] (
val homeAddress = new InetSocketAddress(hostname, port) val homeAddress = new InetSocketAddress(hostname, port)
id = classOrServiceName protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed
id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id
timeout = _timeout timeout = _timeout
start start
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
registry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType) ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = registry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} }
def start: ActorRef = synchronized { def start: ActorRef = synchronized {
_status = ActorRefInternals.RUNNING _status = ActorRefInternals.RUNNING
if (clientManaged) { if (clientManaged)
registry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
}
this this
} }
def stop: Unit = synchronized { def stop: Unit = synchronized {
if (_status == ActorRefInternals.RUNNING) { if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN _status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None) postMessageToMailbox(RemoteActorSystemMessage.Stop, None) //TODO: REVISIT: Should this be called for both server-managed and client-managed?
if (clientManaged) { if (clientManaged)
registry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) ActorRegistry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
registry.remote.unregister(this) //TODO: REVISIT: Why does this need to be deregistered from the server?
}
} }
} }

View file

@ -38,12 +38,13 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ActorRegistry extends ActorRegistryInstance(ReflectiveAccess.Remote.defaultRemoteSupport)
class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => RemoteSupport]) extends ListenerManagement { object ActorRegistry extends ListenerManagement {
protected def remoteBootstrap = ReflectiveAccess.Remote.defaultRemoteSupport
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String,ActorRef] private val actorsById = new Index[String,ActorRef]
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val guard = new ReadWriteGuard private val guard = new ReadWriteGuard
/** /**
@ -230,7 +231,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R
/** /**
* Handy access to the RemoteServer module * Handy access to the RemoteServer module
*/ */
lazy val remote: RemoteSupport = remoteBootstrap.map(_(this)).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) lazy val remote: RemoteSupport = remoteBootstrap.map(_()).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath"))
/** /**
* Creates an ActorRef out of the Actor with type T. * Creates an ActorRef out of the Actor with type T.
@ -280,7 +281,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R
* val actor = actorOf(classOf[MyActor]).start * val actor = actorOf(classOf[MyActor]).start
* </pre> * </pre>
*/ */
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(this, () => { def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
import ReflectiveAccess.{ createInstance, noParams, noArgs } import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException( throw new ActorInitializationException(
@ -326,7 +327,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R
* val actor = actorOf(new MyActor).start * val actor = actorOf(new MyActor).start
* </pre> * </pre>
*/ */
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(this,() => factory) def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
/** /**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
@ -394,41 +395,13 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R
else actorRef.stop else actorRef.stop
} }
} else foreach(_.stop) } else foreach(_.stop)
if (Remote.isEnabled) {
remote.clear
}
actorsByUUID.clear actorsByUUID.clear
actorsById.clear actorsById.clear
log.slf4j.info("All actors have been shut down and unregistered from ActorRegistry") log.slf4j.info("All actors have been shut down and unregistered from ActorRegistry")
} }
/**
* Get the remote actors for the given server address. For internal use only.
*/
private[akka] def actorsFor(remoteServerAddress: Address): RemoteActorSet = guard.withWriteGuard {
remoteActorSets.getOrElseUpdate(remoteServerAddress, new RemoteActorSet)
}
private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) {
actorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, actor)
}
private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) {
typedActorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, typedActor)
}
private[akka] def actors(address: Address) = actorsFor(address).actors
private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid
private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories
private[akka] def typedActors(address: Address) = actorsFor(address).typedActors
private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid
private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories
private[akka] class RemoteActorSet {
private[ActorRegistryInstance] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistryInstance] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistryInstance] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[ActorRegistryInstance] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistryInstance] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistryInstance] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
}
} }
/** /**

View file

@ -9,13 +9,20 @@ import java.net.InetSocketAddress
import akka.actor._ import akka.actor._
import akka.util._ import akka.util._
import akka.dispatch.CompletableFuture import akka.dispatch.CompletableFuture
import akka.actor. {ActorRegistryInstance, ActorType, RemoteActorRef, ActorRef}
import akka.config.Config.{config, TIME_UNIT} import akka.config.Config.{config, TIME_UNIT}
import java.util.concurrent.ConcurrentHashMap
trait RemoteModule extends Logging { trait RemoteModule extends Logging {
def registry: ActorRegistryInstance
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit protected[akka] def notifyListeners(message: => Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef]
private[akka] def typedActors: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef]
} }
@ -23,9 +30,21 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
def shutdown { def shutdown {
this.shutdownServerModule this.shutdownServerModule
this.shutdownClientModule this.shutdownClientModule
clear
} }
protected override def manageLifeCycleOfListeners = false protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] val actors = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
def clear {
List(actors,actorsByUuid,actorsFactories,typedActors,typedActorsByUuid,typedActorsFactories) foreach (_.clear)
}
} }
/** /**
@ -57,7 +76,7 @@ trait RemoteServerModule extends RemoteModule {
/** /**
* Starts the server up * Starts the server up
*/ */
def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule def start(host: String = ReflectiveAccess.Remote.HOSTNAME, port: Int = ReflectiveAccess.Remote.PORT, loader: Option[ClassLoader] = None): RemoteServerModule
/** /**
* Shuts the server down * Shuts the server down

View file

@ -45,10 +45,9 @@ object ReflectiveAccess extends Logging {
//TODO: REVISIT: Make class configurable //TODO: REVISIT: Make class configurable
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
protected[akka] val defaultRemoteSupport: Option[ActorRegistryInstance => RemoteSupport] = remoteSupportClass map { protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map {
remoteClass => (registry: ActorRegistryInstance) => remoteClass => () => createInstance[RemoteSupport](remoteClass,Array[Class[_]](),Array[AnyRef]()).
createInstance[RemoteSupport](remoteClass,Array[Class[_]](classOf[ActorRegistryInstance]),Array[AnyRef](registry)). getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+
getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+
remoteClass.getName+ remoteClass.getName+
", make sure that akka-remote.jar is on the classpath")) ", make sure that akka-remote.jar is on the classpath"))
} }
@ -101,6 +100,7 @@ object ReflectiveAccess extends Logging {
} catch { } catch {
case e => case e =>
log.slf4j.warn("Could not instantiate class [{}] due to [{}]", clazz.getName, e.getCause) log.slf4j.warn("Could not instantiate class [{}] due to [{}]", clazz.getName, e.getCause)
e.printStackTrace
None None
} }

View file

@ -11,8 +11,8 @@ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization
import akka.japi.Creator import akka.japi.Creator
import akka.actor.{ActorRegistryInstance, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, import akka.actor.{newUuid,ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, Exit, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType} RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule} import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule}
import akka.config.Config._ import akka.config.Config._
import akka.serialization.RemoteActorSerialization._ import akka.serialization.RemoteActorSerialization._
@ -73,7 +73,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private val remoteActors = new HashMap[Address, HashSet[Uuid]] private val remoteActors = new HashMap[Address, HashSet[Uuid]]
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(registry, serviceId, implClassName, hostname, port, timeout, false, loader, AkkaActorType.TypedActor)) TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(Some(serviceId), implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any, protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
@ -241,8 +241,6 @@ class RemoteClient private[akka] (
actorRef: ActorRef, actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]], typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] = { actorType: AkkaActorType): Option[CompletableFuture[T]] = {
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
else None
send(createRemoteMessageProtocolBuilder( send(createRemoteMessageProtocolBuilder(
Some(actorRef), Some(actorRef),
Left(actorRef.uuid), Left(actorRef.uuid),
@ -254,21 +252,24 @@ class RemoteClient private[akka] (
senderOption, senderOption,
typedActorInfo, typedActorInfo,
actorType, actorType,
cookie if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE else None
).build, senderFuture) ).build, senderFuture)
} }
def send[T]( def send[T](
request: RemoteMessageProtocol, request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
log.slf4j.debug("sending message: {} is running {} has future {}", Array[AnyRef](request, isRunning.asInstanceOf[AnyRef], senderFuture))
if (isRunning) { if (isRunning) {
if (request.getOneWay) { if (request.getOneWay) {
connection.getChannel.write(request) connection.getChannel.write(request)
None None
} else { } else {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult)
log.slf4j.debug("Stashing away future for {}",futureUuid)
connection.getChannel.write(request) connection.getChannel.write(request)
Some(futureResult) Some(futureResult)
} }
@ -369,34 +370,32 @@ class RemoteClientHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try { try {
val result = event.getMessage event.getMessage match {
if (result.isInstanceOf[RemoteMessageProtocol]) { case reply: RemoteMessageProtocol =>
val reply = result.asInstanceOf[RemoteMessageProtocol] val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) log.slf4j.debug("Trying to map back to future: {}",replyUuid)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) { if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage) val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message) future.completeWithResult(message)
} else { } else {
if (reply.hasSupervisorUuid()) { if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid) val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
}
future.completeWithException(parseException(reply, client.loader))
} }
val exception = parseException(reply, client.loader)
future.completeWithException(exception) case other =>
} throw new RemoteClientException("Unknown message received in remote client handler: " + other, client)
futures remove replyUuid
} else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
client.notifyListeners(RemoteClientError(exception, client))
throw exception
} }
} catch { } catch {
case e: Exception => case e: Exception =>
@ -547,8 +546,11 @@ case class RemoteServerClientClosed(
/** /**
* Provides the implementation of the Netty remote support * Provides the implementation of the Netty remote support
*/ */
class NettyRemoteSupport(val registry: ActorRegistryInstance) class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { //Needed for remote testing and switching on/off under run
private[akka] val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = { protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
//TODO: REVISIT: Possible to optimize server-managed actors in local scope? //TODO: REVISIT: Possible to optimize server-managed actors in local scope?
@ -562,7 +564,7 @@ class NettyRemoteSupport(val registry: ActorRegistryInstance)
// case _ => // case _ =>
// RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) // RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
//} //}
RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) RemoteActorRef(Some(serviceId), className, hostname, port, timeout, loader)
} }
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = { def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = {
@ -571,13 +573,11 @@ class NettyRemoteSupport(val registry: ActorRegistryInstance)
(host,port) match { (host,port) match {
case (Host, Port) if optimizeLocalScoped_? => case (Host, Port) if optimizeLocalScoped_? =>
registry.actorOf(clazz) //Local ActorRegistry.actorOf(clazz) //Local
case _ => case _ =>
new RemoteActorRef(registry,clazz.getName,clazz.getName,host,port,timeout,true /*Client managed*/, None) new RemoteActorRef(None,clazz.getName,host,port,timeout,None)
} }
} }
val optimizeLocalScoped_? = true
} }
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
@ -671,6 +671,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
} }
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard { def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
log.slf4j.debug("Registering remote actor {} to it's uuid {}", actorRef, actorRef.uuid)
register(actorRef.uuid.toString, actorRef, actorsByUuid) register(actorRef.uuid.toString, actorRef, actorsByUuid)
} }
@ -766,13 +767,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/ */
def unregisterTypedPerSessionActor(id: String): Unit = def unregisterTypedPerSessionActor(id: String): Unit =
if (_isRunning.isOn) typedActorsFactories.remove(id) if (_isRunning.isOn) typedActorsFactories.remove(id)
private[akka] def actors = registry.actors(address)
private[akka] def actorsByUuid = registry.actorsByUuid(address)
private[akka] def actorsFactories = registry.actorsFactories(address)
private[akka] def typedActors = registry.typedActors(address)
private[akka] def typedActorsByUuid = registry.typedActorsByUuid(address)
private[akka] def typedActorsFactories = registry.typedActorsFactories(address)
} }
object RemoteServerSslContext { object RemoteServerSslContext {
@ -970,15 +964,18 @@ class RemoteServerHandler(
None, None,
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
onComplete(f => { onComplete(f => {
log.slf4j.debug("Future was completed, now flushing to remote!")
val result = f.result val result = f.result
val exception = f.exception val exception = f.exception
if (exception.isDefined) { if (exception.isDefined) {
log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get) log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass)
try { try {
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
} catch { } catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) case e: Throwable =>
log.slf4j.debug("An error occurred in sending the reply",e)
server.notifyListeners(RemoteServerError(e, server))
} }
} }
else if (result.isDefined) { else if (result.isDefined) {
@ -1069,6 +1066,7 @@ class RemoteServerHandler(
} }
private def findActorByUuid(uuid: String) : ActorRef = { private def findActorByUuid(uuid: String) : ActorRef = {
log.slf4j.debug("Trying to find actor for uuid '{}' inside {}",uuid,server.actorsByUuid)
server.actorsByUuid.get(uuid) server.actorsByUuid.get(uuid)
} }
@ -1149,10 +1147,10 @@ class RemoteServerHandler(
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
log.slf4j.info("Creating a new remote actor [{}:{}]", name, uuid) log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name) else Class.forName(name)
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) val actorRef = ActorRegistry.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id actorRef.id = id
actorRef.timeout = timeout actorRef.timeout = timeout

View file

@ -127,7 +127,7 @@ object ActorSerialization {
messages.map(m => messages.map(m =>
RemoteActorSerialization.createRemoteMessageProtocolBuilder( RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef), Some(actorRef),
Left(actorRef.uuid), Left(actorRef.uuid), //TODO: REVISIT: generate uuid for the request
actorRef.id, actorRef.id,
actorRef.actorClassName, actorRef.actorClassName,
actorRef.timeout, actorRef.timeout,
@ -191,7 +191,6 @@ object ActorSerialization {
} }
val ar = new LocalActorRef( val ar = new LocalActorRef(
ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance?
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
protocol.getId, protocol.getId,
protocol.getOriginalAddress.getHostname, protocol.getOriginalAddress.getHostname,
@ -230,15 +229,16 @@ object RemoteActorSerialization {
*/ */
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol) Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol)
RemoteActorRef( val ref = RemoteActorRef(
ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance? Some(protocol.getClassOrServiceName),
protocol.getClassOrServiceName,
protocol.getActorClassname, protocol.getActorClassname,
protocol.getHomeAddress.getHostname, protocol.getHomeAddress.getHostname,
protocol.getHomeAddress.getPort, protocol.getHomeAddress.getPort,
protocol.getTimeout, protocol.getTimeout,
false,
loader) loader)
Actor.log.slf4j.debug("Newly deserialized RemoteActorRef has uuid: {}", ref.uuid)
ref
} }
/** /**
@ -248,12 +248,12 @@ object RemoteActorSerialization {
import ar._ import ar._
Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]", Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",
Array[AnyRef](actorClassName, registry.remote.hostname, registry.remote.port.asInstanceOf[AnyRef])) Array[AnyRef](actorClassName, ActorSerialization.localAddress.getHostname, ActorSerialization.localAddress.getPort.asInstanceOf[AnyRef]))
registry.remote.registerByUuid(ar) ActorRegistry.remote.registerByUuid(ar)
RemoteActorRefProtocol.newBuilder RemoteActorRefProtocol.newBuilder
.setClassOrServiceName(uuid.toString) .setClassOrServiceName("uuid:"+uuid.toString)
.setActorClassname(actorClassName) .setActorClassname(actorClassName)
.setHomeAddress(ActorSerialization.localAddress) .setHomeAddress(ActorSerialization.localAddress)
.setTimeout(timeout) .setTimeout(timeout)
@ -262,7 +262,7 @@ object RemoteActorSerialization {
def createRemoteMessageProtocolBuilder( def createRemoteMessageProtocolBuilder(
actorRef: Option[ActorRef], actorRef: Option[ActorRef],
uuid: Either[Uuid, UuidProtocol], replyUuid: Either[Uuid, UuidProtocol],
actorId: String, actorId: String,
actorClassName: String, actorClassName: String,
timeout: Long, timeout: Long,
@ -273,7 +273,7 @@ object RemoteActorSerialization {
actorType: ActorType, actorType: ActorType,
secureCookie: Option[String]): RemoteMessageProtocol.Builder = { secureCookie: Option[String]): RemoteMessageProtocol.Builder = {
val uuidProtocol = uuid match { val uuidProtocol = replyUuid match {
case Left(uid) => UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build case Left(uid) => UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build
case Right(protocol) => protocol case Right(protocol) => protocol
} }
@ -298,7 +298,10 @@ object RemoteActorSerialization {
} }
val actorInfo = actorInfoBuilder.build val actorInfo = actorInfoBuilder.build
val messageBuilder = RemoteMessageProtocol.newBuilder val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid(uuidProtocol) .setUuid({
val messageUuid = newUuid
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
})
.setActorInfo(actorInfo) .setActorInfo(actorInfo)
.setOneWay(isOneWay) .setOneWay(isOneWay)
@ -308,10 +311,15 @@ object RemoteActorSerialization {
case Right(exception) => case Right(exception) =>
messageBuilder.setException(ExceptionProtocol.newBuilder messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName) .setClassname(exception.getClass.getName)
.setMessage(exception.getMessage) .setMessage(empty(exception.getMessage))
.build) .build)
} }
def empty(s: String): String = s match {
case null => ""
case s => s
}
secureCookie.foreach(messageBuilder.setCookie(_)) secureCookie.foreach(messageBuilder.setCookie(_))
//TODO: REVISIT: REMOVE //TODO: REVISIT: REMOVE

View file

@ -1,167 +1,169 @@
package akka.actor.remote package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite import org.scalatest.WordSpec
import org.junit.{Test, Before, After} import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient}
import akka.actor. {RemoteActorRef, ActorRegistryInstance, ActorRef, Actor} import akka.actor. {RemoteActorRef, ActorRegistry, ActorRef, Actor}
import akka.actor.Actor._
class ExpectedRemoteProblem extends RuntimeException class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg)
object ClientInitiatedRemoteActorSpec { object RemoteActorSpecActorUnidirectional {
case class Send(actor: Actor) val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
object RemoteActorSpecActorUnidirectional { def receive = {
val latch = new CountDownLatch(1) case "OneWay" =>
} RemoteActorSpecActorUnidirectional.latch.countDown
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" =>
RemoteActorSpecActorUnidirectional.latch.countDown
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" => throw new ExpectedRemoteProblem
}
}
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
class CountDownActor(latch: CountDownLatch) extends Actor {
def receive = {
case "World" => latch.countDown
}
}
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown
}
}
class MyActorCustomConstructor extends Actor {
var prefix = "default-"
var count = 0
def receive = {
case "incrPrefix" => count += 1; prefix = "" + count + "-"
case msg: String => self.reply(prefix + msg)
}
} }
} }
class ClientInitiatedRemoteActorSpec extends JUnitSuite { class RemoteActorSpecActorBidirectional extends Actor {
import ClientInitiatedRemoteActorSpec._ def receive = {
akka.config.Config.config case "Hello" =>
self.reply("World")
val HOSTNAME = "localhost" case "Failure" => throw new ExpectedRemoteProblem("expected")
val PORT1 = 9990
val PORT2 = 9991
var s1,s2: ActorRegistryInstance = null
private val unit = TimeUnit.MILLISECONDS
@Before
def init() {
s1 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_)))
s2 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_)))
s1.remote.start(HOSTNAME, PORT1)
s2.remote.start(HOSTNAME, PORT2)
Thread.sleep(2000)
}
@After
def finished() {
s1.remote.shutdown
s2.remote.shutdown
s1.shutdownAll
s2.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendOneWay = {
val clientManaged = s1.actorOf[RemoteActorSpecActorUnidirectional](HOSTNAME,PORT2).start
//implicit val self = Some(s2.actorOf[RemoteActorSpecActorUnidirectional].start)
assert(clientManaged ne null)
assert(clientManaged.getClass.equals(classOf[RemoteActorRef]))
clientManaged ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
clientManaged.stop
}
@Test
def shouldSendOneWayAndReceiveReply = {
val latch = new CountDownLatch(1)
val actor = s2.actorOf[SendOneWayAndReplyReceiverActor](HOSTNAME, PORT1).start
implicit val sender = Some(s1.actorOf(new CountDownActor(latch)).start)
actor ! "OneWay"
assert(latch.await(3,TimeUnit.SECONDS))
}
@Test
def shouldSendBangBangMessageAndReceiveReply = {
val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test
def shouldSendBangBangMessageAndReceiveReplyConcurrently = {
val actors = (1 to 10).map(num => { s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start }).toList
actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
actors.foreach(_.stop)
}
@Test
def shouldRegisterActorByUuid {
val actor1 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start
actor1 ! "incrPrefix"
assert((actor1 !! "test").get === "1-test")
actor1 ! "incrPrefix"
assert((actor1 !! "test").get === "2-test")
val actor2 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start
assert((actor2 !! "test").get === "default-test")
actor1.stop
actor2.stop
}
@Test(expected=classOf[ExpectedRemoteProblem])
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start
actor !! "Failure"
actor.stop
} }
} }
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
class CountDownActor(latch: CountDownLatch) extends Actor {
def receive = {
case "World" => latch.countDown
}
}
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown
}
}
class MyActorCustomConstructor extends Actor {
var prefix = "default-"
var count = 0
def receive = {
case "incrPrefix" => count += 1; prefix = "" + count + "-"
case msg: String => self.reply(prefix + msg)
}
}
@RunWith(classOf[JUnitRunner])
class ClientInitiatedRemoteActorSpec extends
WordSpec with
MustMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
var optimizeLocal_? = ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_?
override def beforeAll() {
ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls
ActorRegistry.remote.start()
}
override def afterAll() {
ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests
ActorRegistry.shutdownAll
}
override def afterEach() {
ActorRegistry.shutdownAll
super.afterEach
}
"ClientInitiatedRemoteActor" should {
val unit = TimeUnit.MILLISECONDS
val (host, port) = (ActorRegistry.remote.hostname,ActorRegistry.remote.port)
"shouldSendOneWay" in {
val clientManaged = actorOf[RemoteActorSpecActorUnidirectional](host,port).start
clientManaged must not be null
clientManaged.getClass must be (classOf[RemoteActorRef])
clientManaged ! "OneWay"
RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true)
clientManaged.stop
}
"shouldSendOneWayAndReceiveReply" in {
val latch = new CountDownLatch(1)
val actor = actorOf[SendOneWayAndReplyReceiverActor](host,port).start
implicit val sender = Some(actorOf(new CountDownActor(latch)).start)
actor ! "Hello"
latch.await(3,TimeUnit.SECONDS) must be (true)
}
"shouldSendBangBangMessageAndReceiveReply" in {
val actor = actorOf[RemoteActorSpecActorBidirectional](host,port).start
val result = actor !! "Hello"
"World" must equal (result.get.asInstanceOf[String])
actor.stop
}
"shouldSendBangBangMessageAndReceiveReplyConcurrently" in {
val actors = (1 to 10).map(num => { actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList
actors.map(_ !!! "Hello") foreach { future =>
"World" must equal (future.await.result.asInstanceOf[Option[String]].get)
}
actors.foreach(_.stop)
}
"shouldRegisterActorByUuid" in {
val actor1 = actorOf[MyActorCustomConstructor](host, port).start
val actor2 = actorOf[MyActorCustomConstructor](host, port).start
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("1-test")
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("2-test")
(actor2 !! "test").get must equal ("default-test")
actor1.stop
actor2.stop
}
"shouldSendAndReceiveRemoteException" in {
val actor = actorOf[RemoteActorSpecActorBidirectional](host, port).start
try {
implicit val timeout = 500000000L
val f = (actor !!! "Failure").await.resultOrException
fail("Shouldn't get here!!!")
} catch {
case e: ExpectedRemoteProblem =>
}
actor.stop
}
}
}

View file

@ -1,29 +1,31 @@
package akka.actor.remote package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite import org.scalatest.WordSpec
import org.junit.{Test, Before, After} import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.util._ import akka.util._
import akka.remote.{RemoteServer, RemoteClient}
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor.{ActorRegistry, ActorRef, Actor} import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient}
object ServerInitiatedRemoteActorSpec { object ServerInitiatedRemoteActorSpec {
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
case class Send(actor: ActorRef) case class Send(actor: ActorRef)
object RemoteActorSpecActorUnidirectional { class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
def receive = { def receive = {
case "OneWay" => case x: String if x == expect => latch.countDown
RemoteActorSpecActorUnidirectional.latch.countDown }
}
def replyHandler(latch: CountDownLatch, expect: String) = Some(actorOf(new ReplyHandlerActor(latch, expect)).start)
class RemoteActorSpecActorUnidirectional extends Actor {
def receive = {
case "Ping" => self.reply_?("Pong")
} }
} }
@ -51,170 +53,140 @@ object ServerInitiatedRemoteActorSpec {
} }
} }
class ServerInitiatedRemoteActorSpec extends JUnitSuite { @RunWith(classOf[JUnitRunner])
class ServerInitiatedRemoteActorSpec extends
WordSpec with
MustMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
import ServerInitiatedRemoteActorSpec._ import ServerInitiatedRemoteActorSpec._
import ActorRegistry.remote
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
val (host, port) = (remote.hostname,remote.port)
@Before var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_?
def init {
server = new RemoteServer()
server.start(HOSTNAME, PORT) override def beforeAll() {
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls
server.register(actorOf[RemoteActorSpecActorUnidirectional]) remote.start()
server.register(actorOf[RemoteActorSpecActorBidirectional])
server.register(actorOf[RemoteActorSpecActorAsyncSender])
Thread.sleep(1000)
} }
// make sure the servers postStop cleanly after the test has finished override def afterAll() {
@After remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests
def finished {
try {
server.shutdown
val s2 = RemoteServer.serverFor(HOSTNAME, PORT + 1)
if (s2.isDefined) s2.get.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
} }
@Test override def afterEach() {
def shouldSendWithBang { ActorRegistry.shutdownAll
val actor = RemoteClient.actorFor( super.afterEach
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",
5000L,
HOSTNAME, PORT)
val result = actor ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
actor.stop
} }
@Test "Server-managed remote actors" should {
def shouldSendWithBangBangAndGetReply { "sendWithBang" in {
val actor = RemoteClient.actorFor( val latch = new CountDownLatch(1)
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", implicit val sender = replyHandler(latch, "Pong")
5000L, remote.register(actorOf[RemoteActorSpecActorUnidirectional])
HOSTNAME, PORT) val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",5000L,host, port)
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test actor ! "Ping"
def shouldSendWithBangAndGetReplyThroughSenderRef { latch.await(1, TimeUnit.SECONDS) must be (true)
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
val sender = actorOf[RemoteActorSpecActorAsyncSender]
sender.homeAddress = (HOSTNAME, PORT + 1)
sender.start
sender ! Send(actor)
assert(RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test
def shouldSendWithBangBangAndReplyWithException {
implicit val timeout = 500000000L
val actor = RemoteClient.actorFor(
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional",
timeout,
HOSTNAME, PORT)
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
@Test
def reflectiveAccessShouldNotCreateNewRemoteServerObject {
val server1 = new RemoteServer()
server1.start("localhost", 9990)
var found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found")
val a = actorOf( new Actor { def receive = { case _ => } } ).start
found = RemoteServer.serverFor("localhost", 9990)
assert(found.isDefined, "sever not found after creating an actor")
} }
"sendWithBangBangAndGetReply" in {
remote.register(actorOf[RemoteActorSpecActorBidirectional])
val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", 5000L,host, port)
(actor !! "Hello").as[String].get must equal ("World")
}
@Test "sendWithBangAndGetReplyThroughSenderRef" in {
def shouldNotRecreateRegisteredActor { remote.register(actorOf[RemoteActorSpecActorBidirectional])
server.register(actorOf[RemoteActorSpecActorUnidirectional]) implicit val timeout = 500000000L
val actor = RemoteClient.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) val actor = remote.actorFor(
val numberOfActorsInRegistry = ActorRegistry.actors.length "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout,host, port)
actor ! "OneWay" val sender = actorOf[RemoteActorSpecActorAsyncSender].start
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) sender ! Send(actor)
assert(numberOfActorsInRegistry === ActorRegistry.actors.length) RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS) must be (true)
actor.stop }
"sendWithBangBangAndReplyWithException" in {
remote.register(actorOf[RemoteActorSpecActorBidirectional])
implicit val timeout = 500000000L
val actor = remote.actorFor(
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port)
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e => e.getMessage must equal ("Expected exception; to test fault-tolerance")
}
}
"notRecreateRegisteredActor" in {
val latch = new CountDownLatch(1)
implicit val sender = replyHandler(latch, "Pong")
remote.register(actorOf[RemoteActorSpecActorUnidirectional])
val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port)
val numberOfActorsInRegistry = ActorRegistry.actors.length
actor ! "Ping"
latch.await(1, TimeUnit.SECONDS) must be (true)
numberOfActorsInRegistry must equal (ActorRegistry.actors.length)
}
"UseServiceNameAsIdForRemoteActorRef" in {
val latch = new CountDownLatch(3)
implicit val sender = replyHandler(latch, "Pong")
remote.register(actorOf[RemoteActorSpecActorUnidirectional])
remote.register("my-service", actorOf[RemoteActorSpecActorUnidirectional])
val actor1 = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port)
val actor2 = remote.actorFor("my-service", host, port)
val actor3 = remote.actorFor("my-service", host, port)
actor1 ! "Ping"
actor2 ! "Ping"
actor3 ! "Ping"
latch.await(1, TimeUnit.SECONDS) must be (true)
actor1.uuid must not equal actor2.uuid
actor1.uuid must not equal actor3.uuid
actor1.id must not equal actor2.id
actor2.id must equal (actor3.id)
}
"shouldFindActorByUuid" in {
val latch = new CountDownLatch(2)
implicit val sender = replyHandler(latch, "Pong")
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
remote.register("uuid:" + actor1.uuid, actor1)
remote.register("my-service", actor2)
val ref1 = remote.actorFor("uuid:" + actor1.uuid, host, port)
val ref2 = remote.actorFor("my-service", host, port)
ref1 ! "Ping"
ref2 ! "Ping"
latch.await(1, TimeUnit.SECONDS) must be (true)
}
"shouldRegisterAndUnregister" in {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
remote.register("my-service-1", actor1)
remote.actors.get("my-service-1") must not be null
remote.unregister("my-service-1")
remote.actors.get("my-service-1") must be (null)
}
"shouldRegisterAndUnregisterByUuid" in {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val uuid = "uuid:" + actor1.uuid
remote.register(uuid, actor1)
remote.actorsByUuid.get(actor1.uuid.toString) must not be null
remote.unregister(uuid)
remote.actorsByUuid.get(actor1.uuid) must be (null)
}
} }
@Test
def shouldUseServiceNameAsIdForRemoteActorRef {
server.register(actorOf[RemoteActorSpecActorUnidirectional])
server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional])
val actor1 = RemoteClient.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
actor1 ! "OneWay"
actor2 ! "OneWay"
actor3 ! "OneWay"
assert(actor1.uuid != actor2.uuid)
assert(actor1.uuid != actor3.uuid)
assert(actor1.id != actor2.id)
assert(actor2.id == actor3.id)
}
@Test
def shouldFindActorByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
server.register("my-service", actor2)
val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT)
val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
ref1 ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
ref1.stop
ref2 ! "OneWay"
ref2.stop
}
@Test
def shouldRegisterAndUnregister {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("my-service-1", actor1)
assert(server.actors.get("my-service-1") ne null, "actor registered")
server.unregister("my-service-1")
assert(server.actors.get("my-service-1") eq null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
assert(server.actorsByUuid.get(actor1.uuid.toString) ne null, "actor registered")
server.unregister("uuid:" + actor1.uuid)
assert(server.actorsByUuid.get(actor1.uuid) eq null, "actor unregistered")
}
} }