diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index d1c55e88d2..28a47646c3 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -172,7 +172,17 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def getDispatcher(): MessageDispatcher = dispatcher
/**
- * Holds the hot swapped partial function.
+ * Returns on which node this actor lives
+ */
+ def homeAddress: InetSocketAddress
+
+ /**
+ * Java API
+ */
+ def getHomeAddress(): InetSocketAddress = homeAddress
+
+ /**
+ * Holds the hot swapped partial function.
*/
@volatile
protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]()
@@ -504,8 +514,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
- //TODO: REVISIT: REMOVE
- //protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid]
+ protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid]
protected[akka] def linkedActors: JMap[Uuid, ActorRef]
@@ -541,7 +550,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* @author Jonas Bonér
*/
class LocalActorRef private[akka] (
- private[this] val actorFactory: () => Actor)
+ private[this] val actorFactory: () => Actor,
+ val homeAddress: InetSocketAddress = Remote.localAddress)
extends ActorRef with ScalaActorRef {
@volatile
@@ -566,15 +576,14 @@ class LocalActorRef private[akka] (
private[akka] def this(
__uuid: Uuid,
__id: String,
- __hostname: String,
- __port: Int,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
- __factory: () => Actor) = {
- this(__factory)
+ __factory: () => Actor,
+ __homeAddress: InetSocketAddress) = {
+ this(__factory, __homeAddress)
_uuid = __uuid
id = __id
timeout = __timeout
@@ -587,6 +596,8 @@ class LocalActorRef private[akka] (
ActorRegistry.register(this) //TODO: REVISIT: Is this needed?
}
+ private final def isClientManaged_? = (homeAddress ne Remote.localAddress) && homeAddress != Remote.localAddress
+
// ========= PUBLIC FUNCTIONS =========
/**
@@ -630,6 +641,9 @@ class LocalActorRef private[akka] (
if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance
+ if (isRemotingEnabled && isClientManaged_?)
+ ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
+
checkReceiveTimeout //Schedule the initial Receive timeout
}
this
@@ -646,6 +660,11 @@ class LocalActorRef private[akka] (
_status = ActorRefInternals.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
+ if (isRemotingEnabled) {
+ if (isClientManaged_?)
+ ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
+ ActorRegistry.remote.unregister(this)
+ }
setActorSelfFields(actorInstance.get,null)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
@@ -771,21 +790,28 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
- protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- val invocation = new MessageInvocation(this, message, senderOption, None)
- dispatcher dispatchMessage invocation
- }
+ protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
+ if (isClientManaged_? && isRemotingEnabled) {
+ ActorRegistry.remote.send[Any](
+ message, senderOption, None, homeAddress, timeout, true, this, None, ActorType.ScalaActor, None)
+ } else
+ dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
+ if (isClientManaged_? && isRemotingEnabled) {
+ val future = ActorRegistry.remote.send[T](
+ message, senderOption, senderFuture, homeAddress, timeout, false, this, None, ActorType.ScalaActor, None)
+ if (future.isDefined) future.get
+ else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
+ } else {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
- val invocation = new MessageInvocation(
- this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
- dispatcher dispatchMessage invocation
+ dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
future.get
+ }
}
/**
@@ -942,15 +968,13 @@ class LocalActorRef private[akka] (
}
}
- //TODO: REVISIT: REMOVE
-
- /*protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
+ protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled
if (_supervisor.isDefined) {
- remoteAddress.foreach(address => RemoteClientModule.registerSupervisorForActor(address, this))
+ ActorRegistry.remote.registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
- }*/
+ }
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = _linkedActors
@@ -1074,7 +1098,7 @@ object RemoteActorSystemMessage {
* @author Jonas Bonér
*/
private[akka] case class RemoteActorRef private[akka] (
- classOrServiceName: Option[String],
+ classOrServiceName: String,
val actorClassName: String,
val hostname: String,
val port: Int,
@@ -1087,9 +1111,9 @@ private[akka] case class RemoteActorRef private[akka] (
val homeAddress = new InetSocketAddress(hostname, port)
- protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed
-
- id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id
+ //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed
+ id = classOrServiceName
+ //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id
timeout = _timeout
@@ -1110,22 +1134,21 @@ private[akka] case class RemoteActorRef private[akka] (
def start: ActorRef = synchronized {
_status = ActorRefInternals.RUNNING
- if (clientManaged)
- ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
+ //if (clientManaged)
+ // ActorRegistry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
this
}
def stop: Unit = synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
- postMessageToMailbox(RemoteActorSystemMessage.Stop, None) //TODO: REVISIT: Should this be called for both server-managed and client-managed?
- if (clientManaged)
- ActorRegistry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
+ postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
+ // if (clientManaged)
+ // ActorRegistry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
}
}
- //TODO: REVISIT: REMOVE
- //protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
+ protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
// ==== NOT SUPPORTED ====
def actorClass: Class[_ <: Actor] = unsupported
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index 620aea9ecc..e3934f1a58 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -396,7 +396,7 @@ object ActorRegistry extends ListenerManagement {
}
} else foreach(_.stop)
if (Remote.isEnabled) {
- remote.clear
+ remote.clear //TODO: REVISIT: Should this be here?
}
actorsByUUID.clear
actorsById.clear
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
index ea816a7c51..11ed70a08a 100644
--- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -28,8 +28,8 @@ trait RemoteModule extends Logging {
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
def shutdown {
- this.shutdownServerModule
this.shutdownClientModule
+ this.shutdownServerModule
clear
}
protected override def manageLifeCycleOfListeners = false
@@ -241,10 +241,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
actorType: ActorType,
loader: Option[ClassLoader]): Option[CompletableFuture[T]]
- //TODO: REVISIT: IMPLEMENT OR REMOVE
- //private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef
+ private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef
- //private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef
+ private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef
/**
* Clean-up all open connections.
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 1a02136f20..5ba5f513b2 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -37,6 +37,8 @@ object ReflectiveAccess extends Logging {
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 2552)
+ lazy val localAddress = new InetSocketAddress(HOSTNAME,PORT)
+
lazy val isEnabled = remoteSupportClass.isDefined
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala
index c5453a1830..c4d2036cc4 100644
--- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala
@@ -11,8 +11,6 @@ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.japi.Creator
-import akka.actor.{newUuid,ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException,
- RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule}
import akka.config.Config._
import akka.serialization.RemoteActorSerialization._
@@ -34,13 +32,11 @@ import org.jboss.netty.handler.ssl.SslHandler
import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
-import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
-
import scala.collection.mutable.{ HashSet, HashMap }
import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException
-
-
+import akka.actor. {ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
+import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
/**
* Life-cycle events for RemoteClient.
@@ -78,7 +74,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
- TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(Some(serviceId), implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
+ TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
@@ -117,12 +113,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
remoteClients -= hash
}
}
- //TODO: REVISIT IMPLEMENT OR REMOVE
- /*private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
- clientFor().registerSupervisorForActor(actorRef)
+
+ private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
+ clientFor(actorRef.homeAddress, None).registerSupervisorForActor(actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
- clientFor().deregisterSupervisorForActor(actorRef)*/
+ clientFor(actorRef.homeAddress, None).deregisterSupervisorForActor(actorRef)
/**
* Clean-up all open connections.
@@ -265,7 +261,7 @@ class RemoteClient private[akka] (
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
- log.slf4j.debug("sending message: {} is running {} has future {}", Array[AnyRef](request, isRunning.asInstanceOf[AnyRef], senderFuture))
+ log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
if (isRunning) {
if (request.getOneWay) {
connection.getChannel.write(request)
@@ -280,8 +276,7 @@ class RemoteClient private[akka] (
Some(futureResult)
}
} else {
- val exception = new RemoteClientException(
- "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
+ val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
notifyListeners(RemoteClientError(exception, this))
throw exception
}
@@ -580,32 +575,26 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
// case _ =>
// RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
//}
- RemoteActorRef(Some(serviceId), className, hostname, port, timeout, loader)
+ RemoteActorRef(serviceId, className, hostname, port, timeout, loader)
}
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = {
- val Host = this.hostname
- val Port = this.port
-
- (host,port) match {
- case (Host, Port) if optimizeLocalScoped_? =>
- ActorRegistry.actorOf(clazz) //Local
- case _ =>
- new RemoteActorRef(None,clazz.getName,host,port,timeout,None)
- }
+ import ReflectiveAccess.{ createInstance, noParams, noArgs }
+ val ref = new LocalActorRef(() => createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
+ throw new ActorInitializationException(
+ "Could not instantiate Actor" +
+ "\nMake sure Actor is NOT defined inside a class/trait," +
+ "\nif so put it outside the class/trait, f.e. in a companion object," +
+ "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
+ new InetSocketAddress(host, port))
+ ref.timeout = timeout
+ ref
}
}
-trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
- import RemoteServer._
+class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
- @volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
-
- def hostname = address.hostname
- def port = address.port
- def name = "RemoteServer@" + hostname + ":" + port
-
- private val _isRunning = new Switch(false)
+ val name = "NettyRemoteServer@" + host + ":" + port
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool,Executors.newCachedThreadPool)
@@ -614,23 +603,57 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
+ val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, serverModule)
+ bootstrap.setPipelineFactory(pipelineFactory)
+ bootstrap.setOption("child.tcpNoDelay", true)
+ bootstrap.setOption("child.keepAlive", true)
+ bootstrap.setOption("child.reuseAddress", true)
+ bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
+
+ openChannels.add(bootstrap.bind(new InetSocketAddress(host, port)))
+ serverModule.notifyListeners(RemoteServerStarted(serverModule))
+
+ def shutdown {
+ try {
+ openChannels.disconnect
+ openChannels.close.awaitUninterruptibly
+ bootstrap.releaseExternalResources
+ serverModule.notifyListeners(RemoteServerShutdown(serverModule))
+ } catch {
+ case e: java.nio.channels.ClosedChannelException => {}
+ case e => serverModule.log.slf4j.warn("Could not close remote server channel in a graceful way")
+ }
+ }
+}
+
+trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
+ import RemoteServer._
+
+ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
+ def hostname = currentServer.get match {
+ case Some(s) => s.host
+ case None => ReflectiveAccess.Remote.HOSTNAME
+ }
+
+ def port = currentServer.get match {
+ case Some(s) => s.port
+ case None => ReflectiveAccess.Remote.PORT
+ }
+
+ def name = currentServer.get match {
+ case Some(s) => s.name
+ case None => "NettyRemoteServer@" + ReflectiveAccess.Remote.HOSTNAME + ":" + ReflectiveAccess.Remote.PORT
+ }
+
+ private val _isRunning = new Switch(false)
+
def isRunning = _isRunning.isOn
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
- address = Address(_hostname,_port)
- log.slf4j.info("Starting remote server at [{}:{}]", hostname, port)
-
- val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
- bootstrap.setPipelineFactory(pipelineFactory)
- bootstrap.setOption("child.tcpNoDelay", true)
- bootstrap.setOption("child.keepAlive", true)
- bootstrap.setOption("child.reuseAddress", true)
- bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
-
- openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
- notifyListeners(RemoteServerStarted(this))
+ log.slf4j.debug("Starting up remote server on {}:{}",_hostname, _port)
+ currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e =>
@@ -642,14 +665,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
def shutdownServerModule = guard withGuard {
_isRunning switchOff {
- try {
- openChannels.disconnect
- openChannels.close.awaitUninterruptibly
- bootstrap.releaseExternalResources
- notifyListeners(RemoteServerShutdown(this))
- } catch {
- case e: java.nio.channels.ClosedChannelException => {}
- case e => log.slf4j.warn("Could not close remote server channel in a graceful way")
+ currentServer.getAndSet(None) foreach {
+ instance =>
+ log.slf4j.debug("Shutting down remote server on {}:{}",instance.host, instance.port)
+ instance.shutdown
}
}
}
@@ -1136,7 +1155,7 @@ class RemoteServerHandler(
val id = actorInfo.getId
val sessionActorRefOrNull = findSessionActor(id, channel)
if (sessionActorRefOrNull ne null) {
- log.debug("found session actor with id {} for channel {}",id, channel)
+ log.slf4j.debug("Found session actor with id {} for channel {} = {}",Array[AnyRef](id, channel, sessionActorRefOrNull))
sessionActorRefOrNull
} else {
// we dont have it in the session either, see if we have a factory for it
diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
index 6032818e09..92edf9d582 100644
--- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
+++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
@@ -18,6 +18,7 @@ import scala.collection.immutable.Stack
import com.google.protobuf.ByteString
import akka.util.ReflectiveAccess
import akka.util.ReflectiveAccess.Remote.{HOSTNAME,PORT}
+import java.net.InetSocketAddress
/**
* Type class definition for Actor Serialization
@@ -90,9 +91,10 @@ object ActorSerialization {
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
- val localAddress = AddressProtocol.newBuilder
- .setHostname(HOSTNAME)
- .setPort(PORT)
+ private[akka] def toAddressProtocol(actorRef: ActorRef) =
+ AddressProtocol.newBuilder
+ .setHostname(actorRef.homeAddress.getHostName)
+ .setPort(actorRef.homeAddress.getPort)
.build
private[akka] def toSerializedActorRefProtocol[T <: Actor](
@@ -109,7 +111,7 @@ object ActorSerialization {
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
- .setOriginalAddress(localAddress)
+ .setOriginalAddress(toAddressProtocol(actorRef))
.setTimeout(actorRef.timeout)
@@ -193,14 +195,13 @@ object ActorSerialization {
val ar = new LocalActorRef(
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
protocol.getId,
- protocol.getOriginalAddress.getHostname,
- protocol.getOriginalAddress.getPort,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None,
lifeCycle,
supervisor,
hotswap,
- factory)
+ factory,
+ new InetSocketAddress(protocol.getOriginalAddress.getHostname,protocol.getOriginalAddress.getPort))
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
@@ -230,7 +231,7 @@ object RemoteActorSerialization {
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol)
val ref = RemoteActorRef(
- Some(protocol.getClassOrServiceName),
+ protocol.getClassOrServiceName,
protocol.getActorClassname,
protocol.getHomeAddress.getHostname,
protocol.getHomeAddress.getPort,
@@ -247,15 +248,14 @@ object RemoteActorSerialization {
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
import ar._
- Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",
- Array[AnyRef](actorClassName, ActorSerialization.localAddress.getHostname, ActorSerialization.localAddress.getPort.asInstanceOf[AnyRef]))
+ Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",actorClassName, ar.homeAddress)
ActorRegistry.remote.registerByUuid(ar)
RemoteActorRefProtocol.newBuilder
.setClassOrServiceName("uuid:"+uuid.toString)
.setActorClassname(actorClassName)
- .setHomeAddress(ActorSerialization.localAddress)
+ .setHomeAddress(ActorSerialization.toAddressProtocol(ar))
.setTimeout(timeout)
.build
}
@@ -322,8 +322,6 @@ object RemoteActorSerialization {
secureCookie.foreach(messageBuilder.setCookie(_))
- //TODO: REVISIT: REMOVE
- /**
actorRef.foreach { ref =>
ref.registerSupervisorAsRemoteActor.foreach { id =>
messageBuilder.setSupervisorUuid(
@@ -332,7 +330,7 @@ object RemoteActorSerialization {
.setLow(id.getClockSeqAndNode)
.build)
}
- }*/
+ }
if( senderOption.isDefined)
messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
index c5ff2b3024..b22eaa8b27 100644
--- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
+++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
@@ -32,16 +32,22 @@ class AkkaRemoteTest extends
var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_?
- override def beforeAll() {
+ override def beforeAll {
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls
- remote.start()
}
- override def afterAll() {
+ override def afterAll {
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests
}
+ override def beforeEach {
+ remote.start()
+ Thread.sleep(2000)
+ super.beforeEach
+ }
+
override def afterEach() {
+ remote.shutdown
ActorRegistry.shutdownAll
super.afterEach
}
diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala
index 4b3e301aae..ad0028c8ea 100644
--- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala
@@ -9,8 +9,8 @@ import org.junit.runner.RunWith
import akka.dispatch.Dispatchers
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient}
-import akka.actor. {RemoteActorRef, ActorRegistry, ActorRef, Actor}
import akka.actor.Actor._
+import akka.actor._
class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg)
@@ -78,7 +78,7 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
"shouldSendOneWay" in {
val clientManaged = actorOf[RemoteActorSpecActorUnidirectional](host,port).start
clientManaged must not be null
- clientManaged.getClass must be (classOf[RemoteActorRef])
+ clientManaged.getClass must be (classOf[LocalActorRef])
clientManaged ! "OneWay"
RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true)
clientManaged.stop
diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
index bb6ed80ca7..33760028fb 100644
--- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
+++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala
@@ -1,7 +1,7 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB
*/
- /* THIS SHOULD BE UNCOMMENTED
+
package akka.actor.remote
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
@@ -9,8 +9,9 @@ import akka.serialization.BinaryString
import akka.config.Supervision._
import akka.remote.{RemoteServer, RemoteClient}
import akka.OneWay
-import org.scalatest.junit.JUnitSuite
-import org.junit.{Test, Before, After}
+import org.scalatest._
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
import akka.actor.{SupervisorFactory, Supervisor, ActorRef, Actor}
import Actor._
@@ -70,14 +71,7 @@ object Log {
}
}
-object RemoteSupervisorSpec {
- val HOSTNAME = "localhost"
- val PORT = 9988
- var server: RemoteServer = null
-}
-
-class RemoteSupervisorSpec extends JUnitSuite {
- import RemoteSupervisorSpec._
+class RemoteSupervisorSpec extends AkkaRemoteTest {
var pingpong1: ActorRef = _
var pingpong2: ActorRef = _
@@ -85,135 +79,253 @@ class RemoteSupervisorSpec extends JUnitSuite {
import Log._
- @Before
- def init {
- server = new RemoteServer()
- server.start(HOSTNAME, PORT)
- Thread.sleep(1000)
- }
+ "Remote supervision" should {
- @After
- def finished {
- try {
- server.shutdown
- RemoteClient.shutdownAll
- Thread.sleep(1000)
- } catch {
- case e => ()
+ "start server" in {
+ Log.messageLog.clear
+ val sup = getSingleActorAllForOneSupervisor
+
+ (pingpong1 !! BinaryString("Ping")) must equal (Some("pong"))
+ }
+
+ "StartServerForNestedSupervisorHierarchy" in {
+ clearMessageLogs
+ val sup = getNestedSupervisorsAllForOneConf
+ sup.start
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+ }
+
+ "killSingleActorOneForOne" in {
+ clearMessageLogs
+ val sup = getSingleActorOneForOneSupervisor
+
+ (pingpong1 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ }
+
+ "callKillCallSingleActorOneForOne" in {
+ clearMessageLogs
+ val sup = getSingleActorOneForOneSupervisor
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+
+ (pingpong1 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ }
+
+ "KillSingleActorAllForOne" in {
+ clearMessageLogs
+ val sup = getSingleActorAllForOneSupervisor
+
+ (pingpong1 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ }
+
+ "CallKillCallSingleActorAllForOne" in {
+ clearMessageLogs
+ val sup = getSingleActorAllForOneSupervisor
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+
+ (pingpong1 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ }
+
+ "KillMultipleActorsOneForOne1" in {
+ clearMessageLogs
+ val sup = getMultipleActorsOneForOneConf
+
+ (pingpong1 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ }
+
+ "KillCallMultipleActorsOneForOne" in {
+ clearMessageLogs
+ val sup = getMultipleActorsOneForOneConf
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+ (pingpong2 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+ (pingpong3 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+
+ (pingpong2 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+ (pingpong2 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+ (pingpong3 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ }
+
+ "KillMultipleActorsAllForOne" in {
+ clearMessageLogs
+ val sup = getMultipleActorsAllForOneConf
+
+ (pingpong2 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ }
+
+ "CallKillCallMultipleActorsAllForOne" in {
+ clearMessageLogs
+ val sup = getMultipleActorsAllForOneConf
+
+ pingpong1 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+ pingpong2 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+ pingpong3 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+
+ (pingpong2 !!! (BinaryString("Die"), 5000)).await.exception.isDefined must be (true)
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
+
+ pingpong1 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+ pingpong2 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+ pingpong3 !! (BinaryString("Ping"), 5000) must equal (Some("pong"))
+
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
+ messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
}
}
- @Test def shouldStartServer = {
- Log.messageLog.clear
- val sup = getSingleActorAllForOneSupervisor
+ def getSingleActorAllForOneSupervisor: Supervisor = {
- expect("pong") {
- (pingpong1 !! BinaryString("Ping")).getOrElse("nil")
- }
- }
- @Test def shouldStartServerForNestedSupervisorHierarchy = {
- clearMessageLogs
- val sup = getNestedSupervisorsAllForOneConf
- sup.start
+ // Create an abstract SupervisorContainer that works for all implementations
+ // of the different Actors (Services).
+ //
+ // Then create a concrete container in which we mix in support for the specific
+ // implementation of the Actors we want to use.
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
+ pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
+
+ val factory = SupervisorFactory(
+ SupervisorConfig(
+ AllForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong1,
+ Permanent)
+ :: Nil))
+
+ factory.newInstance
}
- @Test def shouldKillSingleActorOneForOne = {
- clearMessageLogs
- val sup = getSingleActorOneForOneSupervisor
+ def getSingleActorOneForOneSupervisor: Supervisor = {
+ pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
- intercept[RuntimeException] {
- pingpong1 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
+ val factory = SupervisorFactory(
+ SupervisorConfig(
+ OneForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong1,
+ Permanent)
+ :: Nil))
+ factory.newInstance
}
- @Test def shouldCallKillCallSingleActorOneForOne = {
- clearMessageLogs
- val sup = getSingleActorOneForOneSupervisor
+ def getMultipleActorsAllForOneConf: Supervisor = {
+ pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
+ pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
+ pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- intercept[RuntimeException] {
- pingpong1 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
+ val factory = SupervisorFactory(
+ SupervisorConfig(
+ AllForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong1,
+ Permanent)
+ ::
+ Supervise(
+ pingpong2,
+ Permanent)
+ ::
+ Supervise(
+ pingpong3,
+ Permanent)
+ :: Nil))
+ factory.newInstance
}
- @Test def shouldKillSingleActorAllForOne = {
- clearMessageLogs
- val sup = getSingleActorAllForOneSupervisor
+ def getMultipleActorsOneForOneConf: Supervisor = {
+ pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
+ pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
+ pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
- intercept[RuntimeException] {
- pingpong1 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
+ val factory = SupervisorFactory(
+ SupervisorConfig(
+ OneForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong1,
+ Permanent)
+ ::
+ Supervise(
+ pingpong2,
+ Permanent)
+ ::
+ Supervise(
+ pingpong3,
+ Permanent)
+ :: Nil))
+ factory.newInstance
}
- @Test def shouldCallKillCallSingleActorAllForOne = {
- clearMessageLogs
- val sup = getSingleActorAllForOneSupervisor
+ def getNestedSupervisorsAllForOneConf: Supervisor = {
+ pingpong1 = actorOf[RemotePingPong1Actor](host,port).start
+ pingpong2 = actorOf[RemotePingPong2Actor](host,port).start
+ pingpong3 = actorOf[RemotePingPong3Actor](host,port).start
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- intercept[RuntimeException] {
- pingpong1 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
+ val factory = SupervisorFactory(
+ SupervisorConfig(
+ AllForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong1,
+ Permanent)
+ ::
+ SupervisorConfig(
+ AllForOneStrategy(List(classOf[Exception]), 3, 100),
+ Supervise(
+ pingpong2,
+ Permanent)
+ ::
+ Supervise(
+ pingpong3,
+ Permanent)
+ :: Nil)
+ :: Nil))
+ factory.newInstance
}
- @Test def shouldKillMultipleActorsOneForOne1 = {
- clearMessageLogs
- val sup = getMultipleActorsOneForOneConf
-
- intercept[RuntimeException] {
- pingpong1 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- }*/
-
/*
// Uncomment when the same test passes in SupervisorSpec - pending bug
@Test def shouldKillMultipleActorsOneForOne2 = {
@@ -229,141 +341,6 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
}
*/
- /* THIS SHOULD BE UNCOMMENTED
- @Test def shouldKillCallMultipleActorsOneForOne = {
- clearMessageLogs
- val sup = getMultipleActorsOneForOneConf
-
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- intercept[RuntimeException] {
- pingpong2 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- }
-
- @Test def shouldKillMultipleActorsAllForOne = {
- clearMessageLogs
- val sup = getMultipleActorsAllForOneConf
-
- intercept[RuntimeException] {
- pingpong2 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- }
-
- @Test def shouldCallKillCallMultipleActorsAllForOne = {
- clearMessageLogs
- val sup = getMultipleActorsAllForOneConf
-
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- intercept[RuntimeException] {
- pingpong2 !! (BinaryString("Die"), 5000)
- }
-
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("Expected exception; to test fault-tolerance") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
- }
-
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- expect("ping") {
- messageLog.poll(5, TimeUnit.SECONDS)
- }
- }*/
/*
@@ -405,15 +382,15 @@ class RemoteSupervisorSpec extends JUnitSuite {
expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong2 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong3 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("ping") {
@@ -439,15 +416,15 @@ class RemoteSupervisorSpec extends JUnitSuite {
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
- (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong1 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("pong") {
- (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong2 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("pong") {
- (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil")
+ (pingpong3 !! (BinaryString("Ping"), 5000)) must equal (Some("pong"))
}
expect("ping") {
@@ -461,136 +438,5 @@ class RemoteSupervisorSpec extends JUnitSuite {
}
}
*/
- // =============================================
- // Creat some supervisors with different configurations
- /* THIS SHOULD BE UNCOMMENTED
- def getSingleActorAllForOneSupervisor: Supervisor = {
- // Create an abstract SupervisorContainer that works for all implementations
- // of the different Actors (Services).
- //
- // Then create a concrete container in which we mix in support for the specific
- // implementation of the Actors we want to use.
-
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1.start
-
- val factory = SupervisorFactory(
- SupervisorConfig(
- AllForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong1,
- Permanent)
- :: Nil))
-
- factory.newInstance
- }
-
- def getSingleActorOneForOneSupervisor: Supervisor = {
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1.start
-
- val factory = SupervisorFactory(
- SupervisorConfig(
- OneForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong1,
- Permanent)
- :: Nil))
- factory.newInstance
- }
-
- def getMultipleActorsAllForOneConf: Supervisor = {
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1.start
- pingpong2 = actorOf[RemotePingPong2Actor]
- pingpong2.makeRemote(HOSTNAME, PORT)
- pingpong2.start
- pingpong3 = actorOf[RemotePingPong3Actor]
- pingpong3.makeRemote(HOSTNAME, PORT)
- pingpong3.start
-
- val factory = SupervisorFactory(
- SupervisorConfig(
- AllForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong1,
- Permanent)
- ::
- Supervise(
- pingpong2,
- Permanent)
- ::
- Supervise(
- pingpong3,
- Permanent)
- :: Nil))
- factory.newInstance
- }
-
- def getMultipleActorsOneForOneConf: Supervisor = {
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1.start
- pingpong2 = actorOf[RemotePingPong2Actor]
- pingpong2.makeRemote(HOSTNAME, PORT)
- pingpong2.start
- pingpong3 = actorOf[RemotePingPong3Actor]
- pingpong3.makeRemote(HOSTNAME, PORT)
- pingpong3.start
-
- val factory = SupervisorFactory(
- SupervisorConfig(
- OneForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong1,
- Permanent)
- ::
- Supervise(
- pingpong2,
- Permanent)
- ::
- Supervise(
- pingpong3,
- Permanent)
- :: Nil))
- factory.newInstance
- }
-
- def getNestedSupervisorsAllForOneConf: Supervisor = {
- pingpong1 = actorOf[RemotePingPong1Actor]
- pingpong1.makeRemote(HOSTNAME, PORT)
- pingpong1.start
- pingpong2 = actorOf[RemotePingPong2Actor]
- pingpong2.makeRemote(HOSTNAME, PORT)
- pingpong2.start
- pingpong3 = actorOf[RemotePingPong3Actor]
- pingpong3.makeRemote(HOSTNAME, PORT)
- pingpong3.start
-
- val factory = SupervisorFactory(
- SupervisorConfig(
- AllForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong1,
- Permanent)
- ::
- SupervisorConfig(
- AllForOneStrategy(List(classOf[Exception]), 3, 100),
- Supervise(
- pingpong2,
- Permanent)
- ::
- Supervise(
- pingpong3,
- Permanent)
- :: Nil)
- :: Nil))
- factory.newInstance
- }
-}*/
+}
diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala
index 49c31b1ae7..b0b10e5ada 100644
--- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala
+++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala
@@ -11,11 +11,10 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import java.util.concurrent.TimeUnit
-
import akka.actor._
import akka.actor.Actor._
import akka.remote.NettyRemoteSupport
+import java.util.concurrent. {ConcurrentSkipListSet, TimeUnit}
object ServerInitiatedRemoteSessionActorSpec {
@@ -23,27 +22,18 @@ object ServerInitiatedRemoteSessionActorSpec {
case class GetUser()
case class DoSomethingFunny()
- var instantiatedSessionActors= Set[ActorRef]()
+ val instantiatedSessionActors = new ConcurrentSkipListSet[ActorRef]()
class RemoteStatefullSessionActorSpec extends Actor {
- var user : String= "anonymous"
-
- override def preStart = {
- instantiatedSessionActors += self
- }
-
- override def postStop = {
- instantiatedSessionActors -= self
- }
+ override def preStart = instantiatedSessionActors.add(self)
+ override def postStop = instantiatedSessionActors.remove(self)
+ var user: String = "anonymous"
def receive = {
- case Login(user) =>
- this.user = user
- case GetUser() =>
- self.reply(this.user)
- case DoSomethingFunny() =>
- throw new Exception("Bad boy")
+ case Login(user) => this.user = user
+ case GetUser() => self.reply(this.user)
+ case DoSomethingFunny() => throw new Exception("Bad boy")
}
}
@@ -74,44 +64,37 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
default2.as[String] must equal (Some("anonymous"))
}
- /*"stop the actor when the client disconnects" in {
- val session1 = RemoteClient.actorFor(
- "untyped-session-actor-service",
- 5000L,
- HOSTNAME, PORT)
-
+ "stop the actor when the client disconnects" in {
+ instantiatedSessionActors.clear
+ remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
+ val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
val default1 = session1 !! GetUser()
- default1.get.asInstanceOf[String] should equal ("anonymous")
+ default1.as[String] must equal (Some("anonymous"))
- instantiatedSessionActors should have size (1)
-
- RemoteClient.shutdownAll
- Thread.sleep(1000)
- instantiatedSessionActors should have size (0)
+ instantiatedSessionActors must have size (1)
+ remote.shutdownClientModule
+ instantiatedSessionActors must have size (0)
}
"stop the actor when there is an error" in {
- val session1 = RemoteClient.actorFor(
- "untyped-session-actor-service",
- 5000L,
- HOSTNAME, PORT)
-
+ instantiatedSessionActors.clear
+ remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
+ val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
session1 ! DoSomethingFunny()
session1.stop()
-
Thread.sleep(1000)
- instantiatedSessionActors should have size (0)
+ instantiatedSessionActors must have size (0)
}
"be able to unregister" in {
- server.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec])
- server.actorsFactories.get("my-service-1") should not be (null)
- server.unregisterPerSession("my-service-1")
- server.actorsFactories.get("my-service-1") should be (null)
- } */
+ remote.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec])
+ remote.asInstanceOf[NettyRemoteSupport].actorsFactories.get("my-service-1") must not be (null)
+ remote.unregisterPerSession("my-service-1")
+ remote.asInstanceOf[NettyRemoteSupport].actorsFactories.get("my-service-1") must be (null)
+ }
}
}
diff --git a/akka-remote/src/test/scala/remote/ShutdownSpec.scala b/akka-remote/src/test/scala/remote/ShutdownSpec.scala
deleted file mode 100644
index 81d6a608ad..0000000000
--- a/akka-remote/src/test/scala/remote/ShutdownSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package akka.remote
-/* THIS SHOULD BE UNCOMMENTED
-import akka.actor.Actor
-
-import Actor._
-
-object ActorShutdownRunner {
- def main(args: Array[String]) {
- class MyActor extends Actor {
- def receive = {
- case "test" => println("received test")
- case m@_ => println("received unknown message " + m)
- }
- }
-
- val myActor = actorOf[MyActor]
- myActor.start
- myActor ! "test"
- myActor.stop
- }
-}
-
-
-// case 2
-
-object RemoteServerAndClusterShutdownRunner {
- def main(args: Array[String]) {
- val s1 = new RemoteServer
- val s2 = new RemoteServer
- val s3 = new RemoteServer
- s1.start("localhost", 2552)
- s2.start("localhost", 9998)
- s3.start("localhost", 9997)
- Thread.sleep(5000)
- s1.shutdown
- s2.shutdown
- s3.shutdown
- }
-}
-*/
\ No newline at end of file
diff --git a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala
index 3c9aeb97f1..d0dcdac048 100644
--- a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala
+++ b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala
@@ -4,10 +4,6 @@ import java.util.concurrent.TimeUnit
import org.scalatest._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-import org.scalatest.junit.JUnitRunner
-import org.junit.runner.RunWith
-
import akka.actor.{ProtobufProtocol, Actor}
import ProtobufProtocol.ProtobufPOJO
import Actor._
diff --git a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
index 7026d1cfc0..02b29e6de1 100644
--- a/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
+++ b/akka-remote/src/test/scala/serialization/ScalaJSONSerializerSpec.scala
@@ -7,6 +7,7 @@ import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.serialization.Serializer.ScalaJSON
+//TODO: FIXME WHY IS THIS COMMENTED OUT?
/*
object Protocols {
import sjson.json.DefaultProtocol._