diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 654b8141b8..eef45a459f 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -13,6 +13,8 @@ import java.util.concurrent.TimeUnit
import akka.event.EventStream
import akka.event.DeathWatch
import scala.annotation.tailrec
+import java.util.concurrent.ConcurrentHashMap
+import akka.event.LoggingAdapter
/**
* ActorRef is an immutable and serializable handle to an Actor.
@@ -400,6 +402,41 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
+class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef {
+
+ private val children = new ConcurrentHashMap[String, InternalActorRef]
+
+ def addChild(name: String, ref: InternalActorRef): Unit = {
+ children.put(name, ref) match {
+ case null ⇒ // okay
+ case old ⇒ log.warning("{} replacing child {} ({} -> {})", path, name, old, ref)
+ }
+ }
+
+ def removeChild(name: String): Unit = {
+ children.remove(name) match {
+ case null ⇒ log.warning("{} trying to remove non-child {}", path, name)
+ case _ ⇒ //okay
+ }
+ }
+
+ def getChild(name: String): InternalActorRef = children.get(name)
+
+ override def getChild(name: Iterator[String]): InternalActorRef = {
+ if (name.isEmpty) this
+ else {
+ val n = name.next()
+ if (n.isEmpty) this
+ else children.get(n) match {
+ case null ⇒ Nobody
+ case some ⇒
+ if (name.isEmpty) some
+ else some.getChild(name)
+ }
+ }
+ }
+}
+
class AskActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index 739eee6234..962024ae32 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -448,12 +448,19 @@ class LocalActorRefProvider(
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
+ @volatile
+ private var extraNames: Map[String, InternalActorRef] = Map()
+
lazy val rootGuardian: InternalActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
+ object Extra {
+ def unapply(s: String): Option[InternalActorRef] = extraNames.get(s)
+ }
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = {
name match {
- case "temp" ⇒ tempContainer
- case _ ⇒ super.getSingleChild(name)
+ case "temp" ⇒ tempContainer
+ case Extra(e) ⇒ e
+ case _ ⇒ super.getSingleChild(name)
}
}
}
@@ -462,24 +469,7 @@ class LocalActorRefProvider(
lazy val systemGuardian: InternalActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "system", true)
- lazy val tempContainer = new MinimalActorRef {
- val children = new ConcurrentHashMap[String, AskActorRef]
- def path = tempNode
- override def getParent = rootGuardian
- override def getChild(name: Iterator[String]): InternalActorRef = {
- if (name.isEmpty) this
- else {
- val n = name.next()
- if (n.isEmpty) this
- else children.get(n) match {
- case null ⇒ Nobody
- case some ⇒
- if (name.isEmpty) some
- else some.getChild(name)
- }
- }
- }
- }
+ lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
val deathWatch = createDeathWatch()
@@ -491,6 +481,8 @@ class LocalActorRefProvider(
eventStream.startDefaultLoggers(_system)
}
+ def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
+
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case RelativeActorPath(elems) ⇒
if (elems.isEmpty) deadLetters
@@ -578,10 +570,10 @@ class LocalActorRefProvider(
val name = path.name
val a = new AskActorRef(path, tempContainer, deathWatch, t, dispatcher) {
override def whenDone() {
- tempContainer.children.remove(name)
+ tempContainer.removeChild(name)
}
}
- tempContainer.children.put(name, a)
+ tempContainer.addChild(name, a)
recipient.tell(message, a)
a.result
}
diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
index 67e260eef9..0fab0a33f3 100644
--- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
+++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java
@@ -5265,6 +5265,10 @@ public final class RemoteProtocol {
boolean hasReplicateActorFromUuid();
akka.remote.RemoteProtocol.UuidProtocol getReplicateActorFromUuid();
akka.remote.RemoteProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder();
+
+ // optional string supervisor = 5;
+ boolean hasSupervisor();
+ String getSupervisor();
}
public static final class RemoteSystemDaemonMessageProtocol extends
com.google.protobuf.GeneratedMessage
@@ -5360,11 +5364,44 @@ public final class RemoteProtocol {
return replicateActorFromUuid_;
}
+ // optional string supervisor = 5;
+ public static final int SUPERVISOR_FIELD_NUMBER = 5;
+ private java.lang.Object supervisor_;
+ public boolean hasSupervisor() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public String getSupervisor() {
+ java.lang.Object ref = supervisor_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ supervisor_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getSupervisorBytes() {
+ java.lang.Object ref = supervisor_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ supervisor_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP;
actorPath_ = "";
payload_ = com.google.protobuf.ByteString.EMPTY;
replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance();
+ supervisor_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5400,6 +5437,9 @@ public final class RemoteProtocol {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(4, replicateActorFromUuid_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(5, getSupervisorBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -5425,6 +5465,10 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(4, replicateActorFromUuid_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(5, getSupervisorBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5562,6 +5606,8 @@ public final class RemoteProtocol {
replicateActorFromUuidBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
+ supervisor_ = "";
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -5620,6 +5666,10 @@ public final class RemoteProtocol {
} else {
result.replicateActorFromUuid_ = replicateActorFromUuidBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.supervisor_ = supervisor_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5648,6 +5698,9 @@ public final class RemoteProtocol {
if (other.hasReplicateActorFromUuid()) {
mergeReplicateActorFromUuid(other.getReplicateActorFromUuid());
}
+ if (other.hasSupervisor()) {
+ setSupervisor(other.getSupervisor());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -5719,6 +5772,11 @@ public final class RemoteProtocol {
setReplicateActorFromUuid(subBuilder.buildPartial());
break;
}
+ case 42: {
+ bitField0_ |= 0x00000010;
+ supervisor_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -5899,6 +5957,42 @@ public final class RemoteProtocol {
return replicateActorFromUuidBuilder_;
}
+ // optional string supervisor = 5;
+ private java.lang.Object supervisor_ = "";
+ public boolean hasSupervisor() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public String getSupervisor() {
+ java.lang.Object ref = supervisor_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ supervisor_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setSupervisor(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ supervisor_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearSupervisor() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ supervisor_ = getDefaultInstance().getSupervisor();
+ onChanged();
+ return this;
+ }
+ void setSupervisor(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000010;
+ supervisor_ = value;
+ onChanged();
+ }
+
// @@protoc_insertion_point(builder_scope:RemoteSystemDaemonMessageProtocol)
}
@@ -6655,27 +6749,27 @@ public final class RemoteProtocol {
"\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006s" +
"ystem\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 " +
"\002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 " +
- "\002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!RemoteSystemDae" +
+ "\002(\t\022\017\n\007message\030\002 \002(\t\"\277\001\n!RemoteSystemDae" +
"monMessageProtocol\0223\n\013messageType\030\001 \002(\0162",
"\036.RemoteSystemDaemonMessageType\022\021\n\tactor" +
"Path\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replicate" +
- "ActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035Du" +
- "rableMailboxMessageProtocol\022$\n\trecipient" +
- "\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(" +
- "\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(\n" +
- "\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*" +
- "K\n\026ReplicationStorageType\022\r\n\tTRANSIENT\020\001" +
- "\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027" +
- "ReplicationStrategyType\022\021\n\rWRITE_THROUGH",
- "\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemDae" +
- "monMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007REL" +
- "EASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNAVA" +
- "ILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020\007\022" +
- "\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_CON" +
- "NECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021FU" +
- "NCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG_U" +
- "NIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013akka" +
- ".remoteH\001"
+ "ActorFromUuid\030\004 \001(\0132\r.UuidProtocol\022\022\n\nsu" +
+ "pervisor\030\005 \001(\t\"y\n\035DurableMailboxMessageP" +
+ "rotocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefPro" +
+ "tocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol" +
+ "\022\017\n\007message\030\003 \002(\014*(\n\013CommandType\022\013\n\007CONN" +
+ "ECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorag" +
+ "eType\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020" +
+ "\002\022\r\n\tDATA_GRID\020\003*>\n\027ReplicationStrategyT",
+ "ype\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002" +
+ "*\241\002\n\035RemoteSystemDaemonMessageType\022\010\n\004ST" +
+ "OP\020\001\022\007\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAIL" +
+ "ABLE\020\004\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNE" +
+ "CT\020\006\022\r\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSI" +
+ "P\020\t\022\031\n\025FAIL_OVER_CONNECTIONS\020\024\022\026\n\022FUNCTI" +
+ "ON_FUN0_UNIT\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n" +
+ "\026FUNCTION_FUN1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FU" +
+ "N1_ARG_ANY\020\030B\017\n\013akka.remoteH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6759,7 +6853,7 @@ public final class RemoteProtocol {
internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteSystemDaemonMessageProtocol_descriptor,
- new java.lang.String[] { "MessageType", "ActorPath", "Payload", "ReplicateActorFromUuid", },
+ new java.lang.String[] { "MessageType", "ActorPath", "Payload", "ReplicateActorFromUuid", "Supervisor", },
akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.class,
akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.Builder.class);
internal_static_DurableMailboxMessageProtocol_descriptor =
diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto
index efd605a108..f7763ba2cc 100644
--- a/akka-remote/src/main/protocol/RemoteProtocol.proto
+++ b/akka-remote/src/main/protocol/RemoteProtocol.proto
@@ -117,6 +117,7 @@ message RemoteSystemDaemonMessageProtocol {
optional string actorPath = 2;
optional bytes payload = 3;
optional UuidProtocol replicateActorFromUuid = 4;
+ optional string supervisor = 5;
}
/**
diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala
index 3335656e14..7b6fd8a660 100644
--- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala
+++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala
@@ -161,7 +161,7 @@ class Gossiper(remote: Remote) {
node ← oldAvailableNodes
if connectionManager.connectionFor(node).isEmpty
} {
- val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, None)
+ val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None)
connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node
oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes
}
diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala
index bfbe3aa658..6192f4d4d5 100644
--- a/akka-remote/src/main/scala/akka/remote/Remote.scala
+++ b/akka-remote/src/main/scala/akka/remote/Remote.scala
@@ -4,9 +4,8 @@
package akka.remote
-import akka.actor.ActorSystem
import akka.actor._
-import akka.event.Logging
+import akka.event._
import akka.actor.Status._
import akka.util._
import akka.util.duration._
@@ -17,13 +16,12 @@ import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import com.eaio.uuid.UUID
-import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
+import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension }
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit.MILLISECONDS
-import akka.serialization.SerializationExtension
import akka.dispatch.SystemMessage
-import akka.event.LoggingAdapter
+import scala.annotation.tailrec
/**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
@@ -37,33 +35,15 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
import system._
import settings._
- private[remote] val serialization = SerializationExtension(system)
- private[remote] val remoteAddress = {
- RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)
- }
+ val serialization = SerializationExtension(system)
+
+ val remoteAddress = RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)
val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize)
- // val gossiper = new Gossiper(this)
-
- val remoteDaemonServiceName = "akka-system-remote-daemon".intern
-
val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
- // FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408
- val remoteDaemonSupervisor =
- system.provider.actorOf(system,
- Props(OneForOneStrategy(List(classOf[Exception]), None, None)),
- system.provider.rootGuardian,
- "akka-system-remote-supervisor",
- systemService = true) // is infinite restart what we want?
-
- val remoteDaemon =
- system.provider.actorOf(system,
- Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)),
- remoteDaemonSupervisor,
- remoteDaemonServiceName,
- systemService = true)
+ val remoteDaemon = new RemoteSystemDaemon(this, provider.rootPath / "remote", provider.rootGuardian, log)
val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor {
def receive = {
@@ -96,7 +76,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
}
}
- log.info("Starting remote server on [{}] and starting remoteDaemon {}", remoteAddress, remoteDaemon)
+ log.info("Starting remote server on [{}]", remoteAddress)
}
/**
@@ -106,75 +86,89 @@ class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettin
*
* @author Jonas Bonér
*/
-class RemoteSystemDaemon(remote: Remote) extends Actor {
+class RemoteSystemDaemon(remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter)
+ extends VirtualPathContainer(_path, _parent, _log) {
- import remote._
- import remote.{ system ⇒ systemImpl }
+ /**
+ * Find the longest matching path which we know about and return that ref
+ * (or ask that ref to continue searching if elements are left).
+ */
+ override def getChild(names: Iterator[String]): InternalActorRef = {
- override def preRestart(reason: Throwable, msg: Option[Any]) {
- log.debug("RemoteSystemDaemon failed due to [{}] - restarting...", reason)
+ @tailrec
+ def rec(s: String, n: Int): (InternalActorRef, Int) = {
+ getChild(s) match {
+ case null ⇒
+ val last = s.lastIndexOf('/')
+ if (last == -1) (Nobody, n)
+ else rec(s.substring(0, last), n + 1)
+ case ref ⇒ (ref, n)
+ }
+ }
+
+ val full = Vector() ++ names
+ rec(full.mkString("/"), 0) match {
+ case (Nobody, _) ⇒ Nobody
+ case (ref, n) if n == 0 ⇒ ref
+ case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator)
+ }
}
- def receive: Actor.Receive = {
+ override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case message: RemoteSystemDaemonMessageProtocol ⇒
- log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, nodename)
+ log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.nodename)
message.getMessageType match {
- case USE ⇒ handleUse(message)
- case RELEASE ⇒ handleRelease(message)
+ case USE ⇒ handleUse(message)
+ case RELEASE ⇒ handleRelease(message)
// case STOP ⇒ cluster.shutdown()
// case DISCONNECT ⇒ cluster.disconnect()
// case RECONNECT ⇒ cluster.reconnect()
// case RESIGN ⇒ cluster.resign()
// case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message)
- case GOSSIP ⇒ handleGossip(message)
- case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message)
- case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message)
- case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message)
- case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message)
- //TODO: should we not deal with unrecognized message types?
+ case GOSSIP ⇒ handleGossip(message)
+ // case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message)
+ // case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message, sender)
+ // case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message)
+ // case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message, sender)
+ case unknown ⇒ log.warning("Unknown message type {} received by {}", unknown, this)
}
- case unknown ⇒ log.warning("Unknown message to RemoteSystemDaemon [{}]", unknown)
+ case Terminated(child) ⇒ removeChild(child.path.elements.drop(1).mkString("/"))
+
+ case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this)
}
def handleUse(message: RemoteSystemDaemonMessageProtocol) {
- try {
- if (message.hasActorPath) {
- val actorFactoryBytes =
- if (remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
+ if (!message.hasActorPath || !message.hasSupervisor) log.error("Ignoring incomplete USE command [{}]", message)
+ else {
- val actorFactory =
- serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
- case Left(error) ⇒ throw error
- case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor]
- }
+ val actorFactoryBytes =
+ if (remote.remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray)
+ else message.getPayload.toByteArray
- message.getActorPath match {
- case RemoteActorPath(`remoteAddress`, elems) if elems.size > 0 ⇒
- val name = elems.last
- systemImpl.provider.actorFor(systemImpl.lookupRoot, elems.dropRight(1)) match {
- case x if x eq system.deadLetters ⇒
- log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
- case parent ⇒
- systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent, name)
- }
- case _ ⇒
- log.error("remote path does not match path from message [{}]", message)
+ val actorFactory =
+ remote.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
+ case Left(error) ⇒ throw error
+ case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor]
}
- } else {
- log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message)
+ import remote.remoteAddress
+
+ message.getActorPath match {
+ case RemoteActorPath(`remoteAddress`, elems) if elems.size > 0 && elems.head == "remote" ⇒
+ // TODO RK canonicalize path so as not to duplicate it always
+ val subpath = elems.drop(1)
+ val path = remote.remoteDaemon.path / subpath
+ val supervisor = remote.system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef]
+ val actor = new LocalActorRef(remote.system, Props(creator = actorFactory), supervisor, path, true)
+ addChild(subpath.mkString("/"), actor)
+ remote.system.deathWatch.subscribe(this, actor)
+ case _ ⇒
+ log.error("remote path does not match path from message [{}]", message)
}
-
- sender ! Success(remoteAddress)
- } catch {
- case exc: Exception ⇒
- sender ! Failure(exc)
- throw exc
}
-
}
// FIXME implement handleRelease
@@ -201,45 +195,47 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
/*
* generate name for temporary actor refs
*/
- private val tempNumber = new AtomicLong
- def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
- def tempPath = remoteDaemon.path / tempName
-
- // FIXME: handle real remote supervision, ticket #1408
- def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
- new LocalActorRef(systemImpl,
- Props(
- context ⇒ {
- case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
- }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
- }
-
- // FIXME: handle real remote supervision, ticket #1408
- def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) {
- new LocalActorRef(systemImpl,
- Props(
- context ⇒ {
- case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() }
- }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
- }
-
- // FIXME: handle real remote supervision, ticket #1408
- def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
- new LocalActorRef(systemImpl,
- Props(
- context ⇒ {
- case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
- }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
- }
-
- // FIXME: handle real remote supervision, ticket #1408
- def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) {
- new LocalActorRef(systemImpl,
- Props(
- context ⇒ {
- case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
- }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
- }
+ // private val tempNumber = new AtomicLong
+ // def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
+ // def tempPath = remote.remoteDaemon.path / tempName
+ //
+ // // FIXME: handle real remote supervision, ticket #1408
+ // def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
+ // new LocalActorRef(remote.system,
+ // Props(
+ // context ⇒ {
+ // case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
+ // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
+ // }
+ //
+ // // FIXME: handle real remote supervision, ticket #1408
+ // def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
+ // implicit val s = sender
+ // new LocalActorRef(remote.system,
+ // Props(
+ // context ⇒ {
+ // case f: Function0[_] ⇒ try { context.sender ! f() } finally { context.self.stop() }
+ // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Any]])
+ // }
+ //
+ // // FIXME: handle real remote supervision, ticket #1408
+ // def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
+ // new LocalActorRef(remote.system,
+ // Props(
+ // context ⇒ {
+ // case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
+ // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
+ // }
+ //
+ // // FIXME: handle real remote supervision, ticket #1408
+ // def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) {
+ // implicit val s = sender
+ // new LocalActorRef(remote.system,
+ // Props(
+ // context ⇒ {
+ // case (fun: Function[_, _], param: Any) ⇒ try { context.sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
+ // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
+ // }
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
// val (from, to) = payloadFor(message, classOf[(InetSocketremoteDaemonServiceName, InetSocketremoteDaemonServiceName)])
@@ -247,7 +243,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
- serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
+ remote.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) ⇒ throw error
case Right(instance) ⇒ instance.asInstanceOf[T]
}
@@ -275,6 +271,8 @@ trait RemoteMarshallingOps {
def system: ActorSystem
+ def remote: Remote
+
protected def useUntrustedMode: Boolean
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@@ -312,7 +310,18 @@ trait RemoteMarshallingOps {
def receiveMessage(remoteMessage: RemoteMessage) {
log.debug("received message {}", remoteMessage)
+ val remoteDaemon = remote.remoteDaemon
+
remoteMessage.recipient match {
+ case `remoteDaemon` ⇒
+ remoteMessage.payload match {
+ case m: RemoteSystemDaemonMessageProtocol ⇒
+ implicit val timeout = system.settings.ActorTimeout
+ try remoteDaemon ! m catch {
+ case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m.getMessageType(), remoteMessage.sender)
+ }
+ case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
+ }
case l @ (_: LocalActorRef | _: MinimalActorRef) ⇒
remoteMessage.payload match {
case msg: SystemMessage ⇒
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
index 44e183ece1..a68fa5e0e6 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
@@ -48,12 +48,9 @@ class RemoteActorRefProvider(
def nodename = remoteSettings.NodeName
def clustername = remoteSettings.ClusterName
- private val actors = new ConcurrentHashMap[String, AnyRef]
-
val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port))
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath)
private var serialization: Serialization = _
- private var remoteDaemonConnectionManager: RemoteConnectionManager = _
private var _remote: Remote = _
def remote = _remote
@@ -62,7 +59,7 @@ class RemoteActorRefProvider(
local.init(system)
serialization = SerializationExtension(system)
_remote = new Remote(system, nodename, remoteSettings)
- remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
+ local.registerExtraNames(Map(("remote", remote.remoteDaemon)))
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
}
@@ -77,90 +74,80 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
val path = supervisor.path / name
- val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
- actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
- case null ⇒
- val actor: InternalActorRef = try {
- deployer.lookupDeploymentFor(path.toString) match {
- case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses))) ⇒
+ deployer.lookupDeploymentFor(path.elements.mkString("/", "/", "")) match {
+ case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses))) ⇒
- def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
-
- //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
-
- if (isReplicaNode) {
- // we are on one of the replica node for this remote actor
- local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
- } else {
-
- implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
- implicit val timeout = system.settings.ActorTimeout
-
- // we are on the single "reference" node uses the remote actors on the replica nodes
- val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
- case RouterType.Direct ⇒
- if (remoteAddresses.size != 1) throw new ConfigurationException(
- "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
- .format(name, remoteAddresses.mkString(", ")))
- () ⇒ new DirectRouter
-
- case RouterType.Broadcast ⇒
- if (remoteAddresses.size != 1) throw new ConfigurationException(
- "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
- .format(name, remoteAddresses.mkString(", ")))
- () ⇒ new BroadcastRouter
-
- case RouterType.Random ⇒
- if (remoteAddresses.size < 1) throw new ConfigurationException(
- "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
- .format(name, remoteAddresses.mkString(", ")))
- () ⇒ new RandomRouter
-
- case RouterType.RoundRobin ⇒
- if (remoteAddresses.size < 1) throw new ConfigurationException(
- "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
- .format(name, remoteAddresses.mkString(", ")))
- () ⇒ new RoundRobinRouter
-
- case RouterType.ScatterGather ⇒
- if (remoteAddresses.size < 1) throw new ConfigurationException(
- "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
- .format(name, remoteAddresses.mkString(", ")))
- () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
-
- case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
- case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
- case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
- case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
- }
-
- val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
- conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
- }
-
- val connectionManager = new RemoteConnectionManager(system, remote, connections)
-
- connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
-
- actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
- }
-
- case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
- }
- } catch {
- case e: Exception ⇒
- newFuture completeWithException e // so the other threads gets notified of error
- throw e
+ // FIXME RK deployer shall only concern itself with placement of actors on remote nodes
+ val address = remoteAddresses.head
+ if (address == rootPath.address) local.actorOf(system, props, supervisor, name, true) // FIXME RK make non-system
+ else {
+ val rpath = RootActorPath(address) / "remote" / rootPath.address.hostPort / path.elements
+ useActorOnNode(rpath, props.creator, supervisor)
+ new RemoteActorRef(this, remote.server, rpath, supervisor, None)
}
- // actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later
-
- newFuture completeWithResult actor
- actors.replace(path.toString, newFuture, actor)
- actor
- case actor: InternalActorRef ⇒ actor
- case future: Future[_] ⇒ future.get.asInstanceOf[InternalActorRef]
+// def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
+//
+// //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
+//
+// if (isReplicaNode) {
+// // we are on one of the replica node for this remote actor
+// local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
+// } else {
+//
+// implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
+// implicit val timeout = system.settings.ActorTimeout
+//
+// // we are on the single "reference" node uses the remote actors on the replica nodes
+// val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
+// case RouterType.Direct ⇒
+// if (remoteAddresses.size != 1) throw new ConfigurationException(
+// "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
+// .format(name, remoteAddresses.mkString(", ")))
+// () ⇒ new DirectRouter
+//
+// case RouterType.Broadcast ⇒
+// if (remoteAddresses.size != 1) throw new ConfigurationException(
+// "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
+// .format(name, remoteAddresses.mkString(", ")))
+// () ⇒ new BroadcastRouter
+//
+// case RouterType.Random ⇒
+// if (remoteAddresses.size < 1) throw new ConfigurationException(
+// "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
+// .format(name, remoteAddresses.mkString(", ")))
+// () ⇒ new RandomRouter
+//
+// case RouterType.RoundRobin ⇒
+// if (remoteAddresses.size < 1) throw new ConfigurationException(
+// "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
+// .format(name, remoteAddresses.mkString(", ")))
+// () ⇒ new RoundRobinRouter
+//
+// case RouterType.ScatterGather ⇒
+// if (remoteAddresses.size < 1) throw new ConfigurationException(
+// "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
+// .format(name, remoteAddresses.mkString(", ")))
+// () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
+//
+// case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
+// case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
+// case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
+// case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
+// }
+//
+// val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
+// conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
+// }
+//
+// val connectionManager = new RemoteConnectionManager(system, remote, connections)
+//
+// connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
+//
+// actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
+// }
+ case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
}
}
@@ -175,14 +162,14 @@ class RemoteActorRefProvider(
def actorFor(path: ActorPath): InternalActorRef = path.root match {
case `rootPath` ⇒ actorFor(rootGuardian, path.elements)
- case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, None)
+ case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None)
case _ ⇒ local.actorFor(path)
}
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case RemoteActorPath(address, elems) ⇒
if (address == rootPath.address) actorFor(rootGuardian, elems)
- else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, None)
+ else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, Nobody, None)
case _ ⇒ local.actorFor(ref, path)
}
@@ -192,16 +179,11 @@ class RemoteActorRefProvider(
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
- /**
- * Returns true if the actor was in the provider's cache and evicted successfully, else false.
- */
- private[akka] def evict(path: ActorPath): Boolean = actors.remove(path) ne null
-
/**
* Using (checking out) actor on a specific node.
*/
- def useActorOnNode(system: ActorSystem, remoteAddress: RemoteAddress, actorPath: String, actorFactory: () ⇒ Actor) {
- log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
+ def useActorOnNode(path: ActorPath, actorFactory: () ⇒ Actor, supervisor: ActorRef) {
+ log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
val actorFactoryBytes =
serialization.serialize(actorFactory) match {
@@ -211,16 +193,13 @@ class RemoteActorRefProvider(
val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(USE)
- .setActorPath(actorPath)
+ .setActorPath(path.toString)
.setPayload(ByteString.copyFrom(actorFactoryBytes))
+ .setSupervisor(supervisor.path.toString)
.build()
- val connectionFactory = () ⇒ actorFor(RootActorPath(remoteAddress) / remote.remoteDaemon.path.elements)
-
- // try to get the connection for the remote address, if not already there then create it
- val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory)
-
- sendCommandToRemoteNode(connection, command, withACK = true) // ensure we get an ACK on the USE command
+ // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor
+ actorFor(RootActorPath(path.address) / "remote") ! command
}
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
@@ -265,12 +244,13 @@ private[akka] class RemoteActorRef private[akka] (
provider: ActorRefProvider,
remote: RemoteSupport,
val path: ActorPath,
+ val getParent: InternalActorRef,
loader: Option[ClassLoader])
extends InternalActorRef {
- // FIXME RK
- def getParent = Nobody
- def getChild(name: Iterator[String]) = Nobody
+ def getChild(name: Iterator[String]): InternalActorRef = {
+ new RemoteActorRef(provider, remote, path / name.toStream, Nobody, loader)
+ }
@volatile
private var running: Boolean = true
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
index d0b7a863d4..49ae8d995c 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
@@ -149,5 +149,5 @@ class RemoteConnectionManager(
}
private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) =
- new RemoteActorRef(remote.system.provider, remote.server, actorPath, None)
+ new RemoteActorRef(remote.system.provider, remote.server, actorPath, Nobody, None)
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
index 487ad683a3..c3db02e9f1 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
@@ -44,17 +44,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream,
"Config option [" + deploymentKey +
".remote.nodes] needs to be a list with elements on format \":\", was [" + remoteNodes.mkString(", ") + "]")
- val remoteAddresses = remoteNodes map { node ⇒
- val tokenizer = new java.util.StringTokenizer(node, ":")
- val hostname = tokenizer.nextElement.toString
- if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError()
- val port = try tokenizer.nextElement.toString.toInt catch {
- case e: Exception ⇒ raiseRemoteNodeParsingError()
- }
- if (port == 0) raiseRemoteNodeParsingError()
-
- RemoteAddress(settings.name, hostname, port)
- }
+ val remoteAddresses = remoteNodes map (RemoteAddress(_, settings.name))
RemoteScope(remoteAddresses)
}
diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
index 72ed415e10..9d53e75993 100644
--- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
@@ -7,6 +7,10 @@ import akka.testkit._
import akka.actor._
import com.typesafe.config._
+object RemoteCommunicationSpec {
+ val echo = Props(ctx ⇒ { case x ⇒ ctx.sender ! x })
+}
+
class RemoteCommunicationSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
@@ -16,9 +20,16 @@ akka {
hostname = localhost
port = 12345
}
+ actor.deployment {
+ /user/blub {
+ remote.nodes = ["remote_sys@localhost:12346"]
+ }
+ }
}
""") with ImplicitSender {
+ import RemoteCommunicationSpec._
+
val conf = ConfigFactory.parseString("akka.remote.server.port=12346").withFallback(system.settings.config)
val other = ActorSystem("remote_sys", conf)
@@ -67,6 +78,13 @@ akka {
}(other)
}
+ "create children on remote node" in {
+ val r = system.actorOf(echo, "blub")
+ r.path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteCommunicationSpec@localhost:12345/user/blub"
+ r ! 42
+ expectMsg(42)
+ }
+
}
}
\ No newline at end of file