diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c97edba37e..918edfdd00 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -80,8 +80,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None protected[akka] val guard = new ReentrantGuard - private[akka] def registry: ActorRegistryInstance - /** * User overridable callback/setting. *
@@ -543,7 +541,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * @author Jonas Bonér */ class LocalActorRef private[akka] ( - private[akka] val registry: ActorRegistryInstance, private[this] val actorFactory: () => Actor) extends ActorRef with ScalaActorRef { @@ -567,7 +564,6 @@ class LocalActorRef private[akka] ( // used only for deserialization private[akka] def this( - __registry: ActorRegistryInstance, __uuid: Uuid, __id: String, __hostname: String, @@ -578,7 +574,7 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () => Actor) = { - this(__registry, __factory) + this(__factory) _uuid = __uuid id = __id timeout = __timeout @@ -588,7 +584,7 @@ class LocalActorRef private[akka] ( hotswap = __hotswap setActorSelfFields(actor,this) start - __registry.register(this) + ActorRegistry.register(this) //TODO: REVISIT: Is this needed? } // ========= PUBLIC FUNCTIONS ========= @@ -649,7 +645,7 @@ class LocalActorRef private[akka] ( dispatcher.detach(this) _status = ActorRefInternals.SHUTDOWN actor.postStop - registry.unregister(this) + ActorRegistry.unregister(this) setActorSelfFields(actorInstance.get,null) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } @@ -1058,7 +1054,7 @@ class LocalActorRef private[akka] ( private def initializeActorInstance = { actor.preStart // run actor preStart Actor.log.slf4j.trace("[{}] has started", toString) - registry.register(this) + ActorRegistry.register(this) } } @@ -1078,13 +1074,11 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - registry: ActorRegistryInstance, - classOrServiceName: String, + classOrServiceName: Option[String], val actorClassName: String, val hostname: String, val port: Int, _timeout: Long, - clientManaged: Boolean, //TODO: REVISIT: ENCODE CLIENT_MANAGED INTO REMOTE PROTOCOL loader: Option[ClassLoader], val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { @@ -1093,40 +1087,40 @@ private[akka] case class RemoteActorRef private[akka] ( val homeAddress = new InetSocketAddress(hostname, port) - id = classOrServiceName + protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed + + id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id + timeout = _timeout start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - registry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType) + ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = registry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) + val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } def start: ActorRef = synchronized { _status = ActorRefInternals.RUNNING - if (clientManaged) { - registry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) - } + if (clientManaged) + ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) this } def stop: Unit = synchronized { if (_status == ActorRefInternals.RUNNING) { _status = ActorRefInternals.SHUTDOWN - postMessageToMailbox(RemoteActorSystemMessage.Stop, None) - if (clientManaged) { - registry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) - registry.remote.unregister(this) //TODO: REVISIT: Why does this need to be deregistered from the server? - } + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) //TODO: REVISIT: Should this be called for both server-managed and client-managed? + if (clientManaged) + ActorRegistry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index f966928965..620aea9ecc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -38,12 +38,13 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * * @author Jonas Bonér */ -object ActorRegistry extends ActorRegistryInstance(ReflectiveAccess.Remote.defaultRemoteSupport) -class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => RemoteSupport]) extends ListenerManagement { +object ActorRegistry extends ListenerManagement { + + protected def remoteBootstrap = ReflectiveAccess.Remote.defaultRemoteSupport + private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] private val actorsById = new Index[String,ActorRef] - private val remoteActorSets = Map[Address, RemoteActorSet]() private val guard = new ReadWriteGuard /** @@ -230,7 +231,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R /** * Handy access to the RemoteServer module */ - lazy val remote: RemoteSupport = remoteBootstrap.map(_(this)).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) + lazy val remote: RemoteSupport = remoteBootstrap.map(_()).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath")) /** * Creates an ActorRef out of the Actor with type T. @@ -280,7 +281,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(this, () => { + def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( throw new ActorInitializationException( @@ -326,7 +327,7 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(this,() => factory) + def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when @@ -394,41 +395,13 @@ class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => R else actorRef.stop } } else foreach(_.stop) + if (Remote.isEnabled) { + remote.clear + } actorsByUUID.clear actorsById.clear log.slf4j.info("All actors have been shut down and unregistered from ActorRegistry") } - - /** - * Get the remote actors for the given server address. For internal use only. - */ - private[akka] def actorsFor(remoteServerAddress: Address): RemoteActorSet = guard.withWriteGuard { - remoteActorSets.getOrElseUpdate(remoteServerAddress, new RemoteActorSet) - } - - private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) { - actorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, actor) - } - - private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) { - typedActorsByUuid(Address(address.getHostName, address.getPort)).putIfAbsent(uuid, typedActor) - } - - private[akka] def actors(address: Address) = actorsFor(address).actors - private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid - private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories - private[akka] def typedActors(address: Address) = actorsFor(address).typedActors - private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid - private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories - - private[akka] class RemoteActorSet { - private[ActorRegistryInstance] val actors = new ConcurrentHashMap[String, ActorRef] - private[ActorRegistryInstance] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] - private[ActorRegistryInstance] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] - private[ActorRegistryInstance] val typedActors = new ConcurrentHashMap[String, AnyRef] - private[ActorRegistryInstance] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] - private[ActorRegistryInstance] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] - } } /** diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index cf246e4ae4..ff2fbcf3cb 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -9,13 +9,20 @@ import java.net.InetSocketAddress import akka.actor._ import akka.util._ import akka.dispatch.CompletableFuture -import akka.actor. {ActorRegistryInstance, ActorType, RemoteActorRef, ActorRef} import akka.config.Config.{config, TIME_UNIT} +import java.util.concurrent.ConcurrentHashMap trait RemoteModule extends Logging { - def registry: ActorRegistryInstance def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: => Any): Unit + + + private[akka] def actors: ConcurrentHashMap[String, ActorRef] + private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] + private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef] + private[akka] def typedActors: ConcurrentHashMap[String, AnyRef] + private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] + private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] } @@ -23,9 +30,21 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule def shutdown { this.shutdownServerModule this.shutdownClientModule + clear } protected override def manageLifeCycleOfListeners = false protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) + + private[akka] val actors = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] + private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] + + def clear { + List(actors,actorsByUuid,actorsFactories,typedActors,typedActorsByUuid,typedActorsFactories) foreach (_.clear) + } } /** @@ -57,7 +76,7 @@ trait RemoteServerModule extends RemoteModule { /** * Starts the server up */ - def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule + def start(host: String = ReflectiveAccess.Remote.HOSTNAME, port: Int = ReflectiveAccess.Remote.PORT, loader: Option[ClassLoader] = None): RemoteServerModule /** * Shuts the server down diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index a923b52dd2..1a02136f20 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -45,10 +45,9 @@ object ReflectiveAccess extends Logging { //TODO: REVISIT: Make class configurable val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) - protected[akka] val defaultRemoteSupport: Option[ActorRegistryInstance => RemoteSupport] = remoteSupportClass map { - remoteClass => (registry: ActorRegistryInstance) => - createInstance[RemoteSupport](remoteClass,Array[Class[_]](classOf[ActorRegistryInstance]),Array[AnyRef](registry)). - getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ + protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = remoteSupportClass map { + remoteClass => () => createInstance[RemoteSupport](remoteClass,Array[Class[_]](),Array[AnyRef]()). + getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+ remoteClass.getName+ ", make sure that akka-remote.jar is on the classpath")) } @@ -101,6 +100,7 @@ object ReflectiveAccess extends Logging { } catch { case e => log.slf4j.warn("Could not instantiate class [{}] due to [{}]", clazz.getName, e.getCause) + e.printStackTrace None } diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index 459eebcaf9..ab63d15b64 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -11,8 +11,8 @@ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization import akka.japi.Creator -import akka.actor.{ActorRegistryInstance, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, - RemoteActorSystemMessage, uuidFrom, Uuid, Exit, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType} +import akka.actor.{newUuid,ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, + RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule} import akka.config.Config._ import akka.serialization.RemoteActorSerialization._ @@ -73,7 +73,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private val remoteActors = new HashMap[Address, HashSet[Uuid]] protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = - TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(registry, serviceId, implClassName, hostname, port, timeout, false, loader, AkkaActorType.TypedActor)) + TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(Some(serviceId), implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor)) protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], @@ -241,8 +241,6 @@ class RemoteClient private[akka] ( actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: AkkaActorType): Option[CompletableFuture[T]] = { - val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE - else None send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), @@ -254,21 +252,24 @@ class RemoteClient private[akka] ( senderOption, typedActorInfo, actorType, - cookie + if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE else None ).build, senderFuture) } def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + log.slf4j.debug("sending message: {} is running {} has future {}", Array[AnyRef](request, isRunning.asInstanceOf[AnyRef], senderFuture)) if (isRunning) { if (request.getOneWay) { connection.getChannel.write(request) None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) + log.slf4j.debug("Stashing away future for {}",futureUuid) connection.getChannel.write(request) Some(futureResult) } @@ -369,34 +370,32 @@ class RemoteClientHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { - val result = event.getMessage - if (result.isInstanceOf[RemoteMessageProtocol]) { - val reply = result.asInstanceOf[RemoteMessageProtocol] - val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) - log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) - val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] - if (reply.hasMessage) { - if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") - val message = MessageSerializer.deserialize(reply.getMessage) - future.completeWithResult(message) - } else { - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + event.getMessage match { + case reply: RemoteMessageProtocol => + val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) + log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) + log.slf4j.debug("Trying to map back to future: {}",replyUuid) + val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + if (reply.hasMessage) { + if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") + val message = MessageSerializer.deserialize(reply.getMessage) + future.completeWithResult(message) + } else { + if (reply.hasSupervisorUuid()) { + val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( + "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + val supervisedActor = supervisors.get(supervisorUuid) + if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( + "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + } + + future.completeWithException(parseException(reply, client.loader)) } - val exception = parseException(reply, client.loader) - future.completeWithException(exception) - } - futures remove replyUuid - } else { - val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.notifyListeners(RemoteClientError(exception, client)) - throw exception + + case other => + throw new RemoteClientException("Unknown message received in remote client handler: " + other, client) } } catch { case e: Exception => @@ -547,8 +546,11 @@ case class RemoteServerClientClosed( /** * Provides the implementation of the Netty remote support */ -class NettyRemoteSupport(val registry: ActorRegistryInstance) - extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { +class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { + //Needed for remote testing and switching on/off under run + private[akka] val optimizeLocal = new AtomicBoolean(true) + + def optimizeLocalScoped_?() = optimizeLocal.get protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = { //TODO: REVISIT: Possible to optimize server-managed actors in local scope? @@ -562,7 +564,7 @@ class NettyRemoteSupport(val registry: ActorRegistryInstance) // case _ => // RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) //} - RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader) + RemoteActorRef(Some(serviceId), className, hostname, port, timeout, loader) } def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = { @@ -571,13 +573,11 @@ class NettyRemoteSupport(val registry: ActorRegistryInstance) (host,port) match { case (Host, Port) if optimizeLocalScoped_? => - registry.actorOf(clazz) //Local + ActorRegistry.actorOf(clazz) //Local case _ => - new RemoteActorRef(registry,clazz.getName,clazz.getName,host,port,timeout,true /*Client managed*/, None) + new RemoteActorRef(None,clazz.getName,host,port,timeout,None) } } - - val optimizeLocalScoped_? = true } trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => @@ -671,6 +671,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } def registerByUuid(actorRef: ActorRef): Unit = guard withGuard { + log.slf4j.debug("Registering remote actor {} to it's uuid {}", actorRef, actorRef.uuid) register(actorRef.uuid.toString, actorRef, actorsByUuid) } @@ -766,13 +767,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => */ def unregisterTypedPerSessionActor(id: String): Unit = if (_isRunning.isOn) typedActorsFactories.remove(id) - - private[akka] def actors = registry.actors(address) - private[akka] def actorsByUuid = registry.actorsByUuid(address) - private[akka] def actorsFactories = registry.actorsFactories(address) - private[akka] def typedActors = registry.typedActors(address) - private[akka] def typedActorsByUuid = registry.typedActorsByUuid(address) - private[akka] def typedActorsFactories = registry.typedActorsFactories(address) } object RemoteServerSslContext { @@ -970,15 +964,18 @@ class RemoteServerHandler( None, Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). onComplete(f => { + log.slf4j.debug("Future was completed, now flushing to remote!") val result = f.result val exception = f.exception if (exception.isDefined) { - log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get) + log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass) try { channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + case e: Throwable => + log.slf4j.debug("An error occurred in sending the reply",e) + server.notifyListeners(RemoteServerError(e, server)) } } else if (result.isDefined) { @@ -1069,6 +1066,7 @@ class RemoteServerHandler( } private def findActorByUuid(uuid: String) : ActorRef = { + log.slf4j.debug("Trying to find actor for uuid '{}' inside {}",uuid,server.actorsByUuid) server.actorsByUuid.get(uuid) } @@ -1149,10 +1147,10 @@ class RemoteServerHandler( if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException( "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") - log.slf4j.info("Creating a new remote actor [{}:{}]", name, uuid) + log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) - val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) + val actorRef = ActorRegistry.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.id = id actorRef.timeout = timeout diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 10eed2c362..6032818e09 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -127,7 +127,7 @@ object ActorSerialization { messages.map(m => RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), - Left(actorRef.uuid), + Left(actorRef.uuid), //TODO: REVISIT: generate uuid for the request actorRef.id, actorRef.actorClassName, actorRef.timeout, @@ -191,7 +191,6 @@ object ActorSerialization { } val ar = new LocalActorRef( - ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance? uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), protocol.getId, protocol.getOriginalAddress.getHostname, @@ -230,15 +229,16 @@ object RemoteActorSerialization { */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol) - RemoteActorRef( - ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance? - protocol.getClassOrServiceName, + val ref = RemoteActorRef( + Some(protocol.getClassOrServiceName), protocol.getActorClassname, protocol.getHomeAddress.getHostname, protocol.getHomeAddress.getPort, protocol.getTimeout, - false, loader) + + Actor.log.slf4j.debug("Newly deserialized RemoteActorRef has uuid: {}", ref.uuid) + ref } /** @@ -248,12 +248,12 @@ object RemoteActorSerialization { import ar._ Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]", - Array[AnyRef](actorClassName, registry.remote.hostname, registry.remote.port.asInstanceOf[AnyRef])) + Array[AnyRef](actorClassName, ActorSerialization.localAddress.getHostname, ActorSerialization.localAddress.getPort.asInstanceOf[AnyRef])) - registry.remote.registerByUuid(ar) + ActorRegistry.remote.registerByUuid(ar) RemoteActorRefProtocol.newBuilder - .setClassOrServiceName(uuid.toString) + .setClassOrServiceName("uuid:"+uuid.toString) .setActorClassname(actorClassName) .setHomeAddress(ActorSerialization.localAddress) .setTimeout(timeout) @@ -262,7 +262,7 @@ object RemoteActorSerialization { def createRemoteMessageProtocolBuilder( actorRef: Option[ActorRef], - uuid: Either[Uuid, UuidProtocol], + replyUuid: Either[Uuid, UuidProtocol], actorId: String, actorClassName: String, timeout: Long, @@ -273,7 +273,7 @@ object RemoteActorSerialization { actorType: ActorType, secureCookie: Option[String]): RemoteMessageProtocol.Builder = { - val uuidProtocol = uuid match { + val uuidProtocol = replyUuid match { case Left(uid) => UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build case Right(protocol) => protocol } @@ -298,7 +298,10 @@ object RemoteActorSerialization { } val actorInfo = actorInfoBuilder.build val messageBuilder = RemoteMessageProtocol.newBuilder - .setUuid(uuidProtocol) + .setUuid({ + val messageUuid = newUuid + UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build + }) .setActorInfo(actorInfo) .setOneWay(isOneWay) @@ -308,10 +311,15 @@ object RemoteActorSerialization { case Right(exception) => messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) - .setMessage(exception.getMessage) + .setMessage(empty(exception.getMessage)) .build) } + def empty(s: String): String = s match { + case null => "" + case s => s + } + secureCookie.foreach(messageBuilder.setCookie(_)) //TODO: REVISIT: REMOVE diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index d5ee009142..7213d856b6 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -1,167 +1,169 @@ package akka.actor.remote import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.{Test, Before, After} +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith import akka.dispatch.Dispatchers import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} -import akka.actor. {RemoteActorRef, ActorRegistryInstance, ActorRef, Actor} +import akka.actor. {RemoteActorRef, ActorRegistry, ActorRef, Actor} +import akka.actor.Actor._ -class ExpectedRemoteProblem extends RuntimeException +class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg) -object ClientInitiatedRemoteActorSpec { - case class Send(actor: Actor) +object RemoteActorSpecActorUnidirectional { + val latch = new CountDownLatch(1) +} +class RemoteActorSpecActorUnidirectional extends Actor { + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) - } - class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown - } - } - - class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => throw new ExpectedRemoteProblem - } - } - - class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") - } - } - - class CountDownActor(latch: CountDownLatch) extends Actor { - def receive = { - case "World" => latch.countDown - } - } - - object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) - } - class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ - - def sendOff = sendTo ! "Hello" - - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown - } - } - - class MyActorCustomConstructor extends Actor { - var prefix = "default-" - var count = 0 - def receive = { - case "incrPrefix" => count += 1; prefix = "" + count + "-" - case msg: String => self.reply(prefix + msg) - } + def receive = { + case "OneWay" => + RemoteActorSpecActorUnidirectional.latch.countDown } } -class ClientInitiatedRemoteActorSpec extends JUnitSuite { - import ClientInitiatedRemoteActorSpec._ - akka.config.Config.config - - val HOSTNAME = "localhost" - val PORT1 = 9990 - val PORT2 = 9991 - var s1,s2: ActorRegistryInstance = null - - private val unit = TimeUnit.MILLISECONDS - - @Before - def init() { - s1 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_))) - s2 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_))) - s1.remote.start(HOSTNAME, PORT1) - s2.remote.start(HOSTNAME, PORT2) - Thread.sleep(2000) - } - - @After - def finished() { - s1.remote.shutdown - s2.remote.shutdown - s1.shutdownAll - s2.shutdownAll - Thread.sleep(1000) - } - - @Test - def shouldSendOneWay = { - val clientManaged = s1.actorOf[RemoteActorSpecActorUnidirectional](HOSTNAME,PORT2).start - //implicit val self = Some(s2.actorOf[RemoteActorSpecActorUnidirectional].start) - assert(clientManaged ne null) - assert(clientManaged.getClass.equals(classOf[RemoteActorRef])) - clientManaged ! "OneWay" - assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - clientManaged.stop - } - - - @Test - def shouldSendOneWayAndReceiveReply = { - val latch = new CountDownLatch(1) - val actor = s2.actorOf[SendOneWayAndReplyReceiverActor](HOSTNAME, PORT1).start - implicit val sender = Some(s1.actorOf(new CountDownActor(latch)).start) - - actor ! "OneWay" - - assert(latch.await(3,TimeUnit.SECONDS)) - } - - @Test - def shouldSendBangBangMessageAndReceiveReply = { - val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start - val result = actor !! "Hello" - assert("World" === result.get.asInstanceOf[String]) - actor.stop - } - - @Test - def shouldSendBangBangMessageAndReceiveReplyConcurrently = { - val actors = (1 to 10).map(num => { s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start }).toList - actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get)) - actors.foreach(_.stop) - } - - @Test - def shouldRegisterActorByUuid { - val actor1 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start - actor1 ! "incrPrefix" - assert((actor1 !! "test").get === "1-test") - actor1 ! "incrPrefix" - assert((actor1 !! "test").get === "2-test") - - val actor2 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start - - assert((actor2 !! "test").get === "default-test") - - actor1.stop - actor2.stop - } - - @Test(expected=classOf[ExpectedRemoteProblem]) - def shouldSendAndReceiveRemoteException { - implicit val timeout = 500000000L - val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start - actor !! "Failure" - actor.stop +class RemoteActorSpecActorBidirectional extends Actor { + def receive = { + case "Hello" => + self.reply("World") + case "Failure" => throw new ExpectedRemoteProblem("expected") } } +class SendOneWayAndReplyReceiverActor extends Actor { + def receive = { + case "Hello" => + self.reply("World") + } +} + +class CountDownActor(latch: CountDownLatch) extends Actor { + def receive = { + case "World" => latch.countDown + } +} + +object SendOneWayAndReplySenderActor { + val latch = new CountDownLatch(1) +} +class SendOneWayAndReplySenderActor extends Actor { + var state: Option[AnyRef] = None + var sendTo: ActorRef = _ + var latch: CountDownLatch = _ + + def sendOff = sendTo ! "Hello" + + def receive = { + case msg: AnyRef => + state = Some(msg) + SendOneWayAndReplySenderActor.latch.countDown + } +} + +class MyActorCustomConstructor extends Actor { + var prefix = "default-" + var count = 0 + def receive = { + case "incrPrefix" => count += 1; prefix = "" + count + "-" + case msg: String => self.reply(prefix + msg) + } +} + +@RunWith(classOf[JUnitRunner]) +class ClientInitiatedRemoteActorSpec extends + WordSpec with + MustMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { + + var optimizeLocal_? = ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_? + + override def beforeAll() { + ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls + ActorRegistry.remote.start() + } + + override def afterAll() { + ActorRegistry.remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests + ActorRegistry.shutdownAll + } + + override def afterEach() { + ActorRegistry.shutdownAll + super.afterEach + } + + "ClientInitiatedRemoteActor" should { + val unit = TimeUnit.MILLISECONDS + val (host, port) = (ActorRegistry.remote.hostname,ActorRegistry.remote.port) + + "shouldSendOneWay" in { + val clientManaged = actorOf[RemoteActorSpecActorUnidirectional](host,port).start + clientManaged must not be null + clientManaged.getClass must be (classOf[RemoteActorRef]) + clientManaged ! "OneWay" + RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true) + clientManaged.stop + } + + "shouldSendOneWayAndReceiveReply" in { + val latch = new CountDownLatch(1) + val actor = actorOf[SendOneWayAndReplyReceiverActor](host,port).start + implicit val sender = Some(actorOf(new CountDownActor(latch)).start) + + actor ! "Hello" + + latch.await(3,TimeUnit.SECONDS) must be (true) + } + + "shouldSendBangBangMessageAndReceiveReply" in { + val actor = actorOf[RemoteActorSpecActorBidirectional](host,port).start + val result = actor !! "Hello" + "World" must equal (result.get.asInstanceOf[String]) + actor.stop + } + + "shouldSendBangBangMessageAndReceiveReplyConcurrently" in { + val actors = (1 to 10).map(num => { actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList + actors.map(_ !!! "Hello") foreach { future => + "World" must equal (future.await.result.asInstanceOf[Option[String]].get) + } + actors.foreach(_.stop) + } + + "shouldRegisterActorByUuid" in { + val actor1 = actorOf[MyActorCustomConstructor](host, port).start + val actor2 = actorOf[MyActorCustomConstructor](host, port).start + + actor1 ! "incrPrefix" + + (actor1 !! "test").get must equal ("1-test") + + actor1 ! "incrPrefix" + + (actor1 !! "test").get must equal ("2-test") + + (actor2 !! "test").get must equal ("default-test") + + actor1.stop + actor2.stop + } + + "shouldSendAndReceiveRemoteException" in { + + val actor = actorOf[RemoteActorSpecActorBidirectional](host, port).start + try { + implicit val timeout = 500000000L + val f = (actor !!! "Failure").await.resultOrException + fail("Shouldn't get here!!!") + } catch { + case e: ExpectedRemoteProblem => + } + actor.stop + } + } +} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 14387e7909..7b48cde9bb 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -1,29 +1,31 @@ package akka.actor.remote import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.{Test, Before, After} +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith import akka.util._ -import akka.remote.{RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} +import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient} object ServerInitiatedRemoteActorSpec { - val HOSTNAME = "localhost" - val PORT = 9990 - var server: RemoteServer = null - case class Send(actor: ActorRef) - object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) - } - class RemoteActorSpecActorUnidirectional extends Actor { - + class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor { def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown + case x: String if x == expect => latch.countDown + } + } + + def replyHandler(latch: CountDownLatch, expect: String) = Some(actorOf(new ReplyHandlerActor(latch, expect)).start) + + class RemoteActorSpecActorUnidirectional extends Actor { + def receive = { + case "Ping" => self.reply_?("Pong") } } @@ -51,170 +53,140 @@ object ServerInitiatedRemoteActorSpec { } } -class ServerInitiatedRemoteActorSpec extends JUnitSuite { +@RunWith(classOf[JUnitRunner]) +class ServerInitiatedRemoteActorSpec extends + WordSpec with + MustMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { import ServerInitiatedRemoteActorSpec._ + import ActorRegistry.remote private val unit = TimeUnit.MILLISECONDS + val (host, port) = (remote.hostname,remote.port) - @Before - def init { - server = new RemoteServer() + var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_? - server.start(HOSTNAME, PORT) - - server.register(actorOf[RemoteActorSpecActorUnidirectional]) - server.register(actorOf[RemoteActorSpecActorBidirectional]) - server.register(actorOf[RemoteActorSpecActorAsyncSender]) - - Thread.sleep(1000) + override def beforeAll() { + remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls + remote.start() } - // make sure the servers postStop cleanly after the test has finished - @After - def finished { - try { - server.shutdown - val s2 = RemoteServer.serverFor(HOSTNAME, PORT + 1) - if (s2.isDefined) s2.get.shutdown - RemoteClient.shutdownAll - Thread.sleep(1000) - } catch { - case e => () - } + override def afterAll() { + remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests } - @Test - def shouldSendWithBang { - val actor = RemoteClient.actorFor( - "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", - 5000L, - HOSTNAME, PORT) - val result = actor ! "OneWay" - assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - actor.stop + override def afterEach() { + ActorRegistry.shutdownAll + super.afterEach } - @Test - def shouldSendWithBangBangAndGetReply { - val actor = RemoteClient.actorFor( - "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", - 5000L, - HOSTNAME, PORT) - val result = actor !! "Hello" - assert("World" === result.get.asInstanceOf[String]) - actor.stop - } + "Server-managed remote actors" should { + "sendWithBang" in { + val latch = new CountDownLatch(1) + implicit val sender = replyHandler(latch, "Pong") + remote.register(actorOf[RemoteActorSpecActorUnidirectional]) + val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",5000L,host, port) - @Test - def shouldSendWithBangAndGetReplyThroughSenderRef { - implicit val timeout = 500000000L - val actor = RemoteClient.actorFor( - "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", - timeout, - HOSTNAME, PORT) - val sender = actorOf[RemoteActorSpecActorAsyncSender] - sender.homeAddress = (HOSTNAME, PORT + 1) - sender.start - sender ! Send(actor) - assert(RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS)) - actor.stop - } - - @Test - def shouldSendWithBangBangAndReplyWithException { - implicit val timeout = 500000000L - val actor = RemoteClient.actorFor( - "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", - timeout, - HOSTNAME, PORT) - try { - actor !! "Failure" - fail("Should have thrown an exception") - } catch { - case e => - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop - } - - @Test - def reflectiveAccessShouldNotCreateNewRemoteServerObject { - val server1 = new RemoteServer() - server1.start("localhost", 9990) - - var found = RemoteServer.serverFor("localhost", 9990) - assert(found.isDefined, "sever not found") - - val a = actorOf( new Actor { def receive = { case _ => } } ).start - - found = RemoteServer.serverFor("localhost", 9990) - assert(found.isDefined, "sever not found after creating an actor") + actor ! "Ping" + latch.await(1, TimeUnit.SECONDS) must be (true) } + "sendWithBangBangAndGetReply" in { + remote.register(actorOf[RemoteActorSpecActorBidirectional]) + val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", 5000L,host, port) + (actor !! "Hello").as[String].get must equal ("World") + } - @Test - def shouldNotRecreateRegisteredActor { - server.register(actorOf[RemoteActorSpecActorUnidirectional]) - val actor = RemoteClient.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) - val numberOfActorsInRegistry = ActorRegistry.actors.length - actor ! "OneWay" - assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - assert(numberOfActorsInRegistry === ActorRegistry.actors.length) - actor.stop + "sendWithBangAndGetReplyThroughSenderRef" in { + remote.register(actorOf[RemoteActorSpecActorBidirectional]) + implicit val timeout = 500000000L + val actor = remote.actorFor( + "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout,host, port) + val sender = actorOf[RemoteActorSpecActorAsyncSender].start + sender ! Send(actor) + RemoteActorSpecActorAsyncSender.latch.await(1, TimeUnit.SECONDS) must be (true) + } + + "sendWithBangBangAndReplyWithException" in { + remote.register(actorOf[RemoteActorSpecActorBidirectional]) + implicit val timeout = 500000000L + val actor = remote.actorFor( + "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port) + try { + actor !! "Failure" + fail("Should have thrown an exception") + } catch { + case e => e.getMessage must equal ("Expected exception; to test fault-tolerance") + } + } + + "notRecreateRegisteredActor" in { + val latch = new CountDownLatch(1) + implicit val sender = replyHandler(latch, "Pong") + remote.register(actorOf[RemoteActorSpecActorUnidirectional]) + val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port) + val numberOfActorsInRegistry = ActorRegistry.actors.length + actor ! "Ping" + latch.await(1, TimeUnit.SECONDS) must be (true) + numberOfActorsInRegistry must equal (ActorRegistry.actors.length) + } + + "UseServiceNameAsIdForRemoteActorRef" in { + val latch = new CountDownLatch(3) + implicit val sender = replyHandler(latch, "Pong") + remote.register(actorOf[RemoteActorSpecActorUnidirectional]) + remote.register("my-service", actorOf[RemoteActorSpecActorUnidirectional]) + val actor1 = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port) + val actor2 = remote.actorFor("my-service", host, port) + val actor3 = remote.actorFor("my-service", host, port) + + actor1 ! "Ping" + actor2 ! "Ping" + actor3 ! "Ping" + + latch.await(1, TimeUnit.SECONDS) must be (true) + actor1.uuid must not equal actor2.uuid + actor1.uuid must not equal actor3.uuid + actor1.id must not equal actor2.id + actor2.id must equal (actor3.id) + } + + "shouldFindActorByUuid" in { + val latch = new CountDownLatch(2) + implicit val sender = replyHandler(latch, "Pong") + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + val actor2 = actorOf[RemoteActorSpecActorUnidirectional] + remote.register("uuid:" + actor1.uuid, actor1) + remote.register("my-service", actor2) + + val ref1 = remote.actorFor("uuid:" + actor1.uuid, host, port) + val ref2 = remote.actorFor("my-service", host, port) + + ref1 ! "Ping" + ref2 ! "Ping" + latch.await(1, TimeUnit.SECONDS) must be (true) + } + + "shouldRegisterAndUnregister" in { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + + remote.register("my-service-1", actor1) + remote.actors.get("my-service-1") must not be null + + remote.unregister("my-service-1") + remote.actors.get("my-service-1") must be (null) + } + + "shouldRegisterAndUnregisterByUuid" in { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + val uuid = "uuid:" + actor1.uuid + + remote.register(uuid, actor1) + remote.actorsByUuid.get(actor1.uuid.toString) must not be null + + remote.unregister(uuid) + remote.actorsByUuid.get(actor1.uuid) must be (null) + } } - - @Test - def shouldUseServiceNameAsIdForRemoteActorRef { - server.register(actorOf[RemoteActorSpecActorUnidirectional]) - server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional]) - val actor1 = RemoteClient.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT) - val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) - val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) - - actor1 ! "OneWay" - actor2 ! "OneWay" - actor3 ! "OneWay" - - assert(actor1.uuid != actor2.uuid) - assert(actor1.uuid != actor3.uuid) - assert(actor1.id != actor2.id) - assert(actor2.id == actor3.id) - } - - @Test - def shouldFindActorByUuid { - val actor1 = actorOf[RemoteActorSpecActorUnidirectional] - val actor2 = actorOf[RemoteActorSpecActorUnidirectional] - server.register("uuid:" + actor1.uuid, actor1) - server.register("my-service", actor2) - - val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT) - val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) - - ref1 ! "OneWay" - assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) - ref1.stop - ref2 ! "OneWay" - ref2.stop - - } - - @Test - def shouldRegisterAndUnregister { - val actor1 = actorOf[RemoteActorSpecActorUnidirectional] - server.register("my-service-1", actor1) - assert(server.actors.get("my-service-1") ne null, "actor registered") - server.unregister("my-service-1") - assert(server.actors.get("my-service-1") eq null, "actor unregistered") - } - - @Test - def shouldRegisterAndUnregisterByUuid { - val actor1 = actorOf[RemoteActorSpecActorUnidirectional] - server.register("uuid:" + actor1.uuid, actor1) - assert(server.actorsByUuid.get(actor1.uuid.toString) ne null, "actor registered") - server.unregister("uuid:" + actor1.uuid) - assert(server.actorsByUuid.get(actor1.uuid) eq null, "actor unregistered") - } - }