fix ActorRef serialization
- represent it by SerializedActorRef(path), i.e. only a “tagged” string - remove serialize/deserialize from ActorRefProvider interface - adapt test since deadLetters is returned when nothing found instead of exception - multi-jvm tests are still broken, but that is due to look-up of remote actors, which I have just not done yet
This commit is contained in:
parent
b65799c7f3
commit
6b9cdc5f65
10 changed files with 59 additions and 279 deletions
|
|
@ -278,14 +278,14 @@ class ActorRefSpec extends AkkaSpec {
|
|||
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }"
|
||||
}
|
||||
|
||||
"must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
|
||||
"must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
|
||||
import java.io._
|
||||
|
||||
val baos = new ByteArrayOutputStream(8192 * 32)
|
||||
val out = new ObjectOutputStream(baos)
|
||||
|
||||
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address
|
||||
val serialized = SerializedActorRef(addr.hostPort, 0, "/this/path/does/not/exist")
|
||||
val serialized = SerializedActorRef(addr + "/non-existing")
|
||||
|
||||
out.writeObject(serialized)
|
||||
|
||||
|
|
@ -294,9 +294,7 @@ class ActorRefSpec extends AkkaSpec {
|
|||
|
||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
(intercept[java.lang.IllegalStateException] {
|
||||
in.readObject
|
||||
}).getMessage must be === "Could not deserialize ActorRef"
|
||||
in.readObject must be === system.deadLetters
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -246,7 +246,7 @@ class LocalActorRef private[akka] (
|
|||
protected[akka] override def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = actorCell.provider.serialize(this)
|
||||
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -293,23 +293,15 @@ trait ScalaActorRef { ref: ActorRef ⇒
|
|||
* Memento pattern for serializing ActorRefs transparently
|
||||
*/
|
||||
// FIXME: remove and replace by ActorPath.toString
|
||||
case class SerializedActorRef(hostname: String, port: Int, path: String) {
|
||||
case class SerializedActorRef(path: String) {
|
||||
import akka.serialization.Serialization.currentSystem
|
||||
|
||||
// FIXME this is broken, but see above
|
||||
def this(address: Address, path: String) = this(address.hostPort, 0, path)
|
||||
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.host, remoteAddress.port, path)
|
||||
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = currentSystem.value match {
|
||||
case null ⇒ throw new IllegalStateException(
|
||||
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
|
||||
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
|
||||
case someSystem ⇒ someSystem.provider.deserialize(this) match {
|
||||
case Some(actor) ⇒ actor
|
||||
case None ⇒ throw new IllegalStateException("Could not deserialize ActorRef")
|
||||
}
|
||||
case someSystem ⇒ someSystem.actorFor(path)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -419,5 +411,5 @@ class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: D
|
|||
override def stop(): Unit = if (!isTerminated) result.completeWithException(new ActorKilledException("Stopped"))
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = provider.serialize(this)
|
||||
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,10 +96,6 @@ trait ActorRefProvider {
|
|||
*/
|
||||
def actorFor(p: Iterable[String]): ActorRef
|
||||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
|
||||
|
||||
private[akka] def serialize(actor: ActorRef): SerializedActorRef
|
||||
|
||||
private[akka] def createDeathWatch(): DeathWatch
|
||||
|
||||
/**
|
||||
|
|
@ -372,10 +368,6 @@ class LocalActorRefProvider(
|
|||
new RoutedActorRef(system, props, supervisor, name)
|
||||
}
|
||||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = Some(actorFor(actor.path))
|
||||
|
||||
private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.address, actor.path.toString)
|
||||
|
||||
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
|
||||
|
||||
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
|
||||
|
|
|
|||
|
|
@ -52,10 +52,7 @@ trait DurableMessageSerialization {
|
|||
|
||||
def serialize(durableMessage: Envelope): Array[Byte] = {
|
||||
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
|
||||
val serRef = owner.system.provider.serialize(ref)
|
||||
ActorRefProtocol.newBuilder.setPath(serRef.path).setHost(serRef.hostname).setPort(serRef.port).build
|
||||
}
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
|
||||
|
||||
val message = MessageSerializer.serialize(owner.system, durableMessage.message.asInstanceOf[AnyRef])
|
||||
val builder = RemoteMessageProtocol.newBuilder
|
||||
|
|
@ -68,10 +65,7 @@ trait DurableMessageSerialization {
|
|||
|
||||
def deserialize(bytes: Array[Byte]): Envelope = {
|
||||
|
||||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = {
|
||||
val serRef = SerializedActorRef(refProtocol.getHost, refProtocol.getPort, refProtocol.getPath)
|
||||
owner.system.provider.deserialize(serRef).getOrElse(owner.system.deadLetters)
|
||||
}
|
||||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath)
|
||||
|
||||
val durableMessage = RemoteMessageProtocol.parseFrom(bytes)
|
||||
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage)
|
||||
|
|
|
|||
|
|
@ -29,11 +29,7 @@ class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObjec
|
|||
val b = Map.newBuilder[String, Any]
|
||||
b += "_id" -> msg._id
|
||||
b += "ownerPath" -> msg.ownerPath
|
||||
|
||||
val sender = systemImpl.provider.serialize(msg.sender)
|
||||
b += "senderPath" -> sender.path
|
||||
b += "senderHostname" -> sender.hostname
|
||||
b += "senderPort" -> sender.port
|
||||
b += "senderPath" -> msg.sender.path
|
||||
|
||||
/**
|
||||
* TODO - Figure out a way for custom serialization of the message instance
|
||||
|
|
@ -75,10 +71,7 @@ class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObjec
|
|||
val msg = MessageSerializer.deserialize(system, msgData)
|
||||
val ownerPath = doc.as[String]("ownerPath")
|
||||
val senderPath = doc.as[String]("senderPath")
|
||||
val senderHostname = doc.as[String]("senderHostname")
|
||||
val senderPort = doc.as[Int]("senderPort")
|
||||
val sender = systemImpl.provider.deserialize(SerializedActorRef(senderHostname, senderPort, senderPath)).
|
||||
getOrElse(system.deadLetters)
|
||||
val sender = systemImpl.actorOf(senderPath)
|
||||
|
||||
MongoDurableMessage(ownerPath, msg, sender)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2711,15 +2711,7 @@ public final class RemoteProtocol {
|
|||
public interface ActorRefProtocolOrBuilder
|
||||
extends com.google.protobuf.MessageOrBuilder {
|
||||
|
||||
// required string host = 1;
|
||||
boolean hasHost();
|
||||
String getHost();
|
||||
|
||||
// required uint32 port = 2;
|
||||
boolean hasPort();
|
||||
int getPort();
|
||||
|
||||
// required string path = 3;
|
||||
// required string path = 1;
|
||||
boolean hasPath();
|
||||
String getPath();
|
||||
}
|
||||
|
|
@ -2752,53 +2744,11 @@ public final class RemoteProtocol {
|
|||
}
|
||||
|
||||
private int bitField0_;
|
||||
// required string host = 1;
|
||||
public static final int HOST_FIELD_NUMBER = 1;
|
||||
private java.lang.Object host_;
|
||||
public boolean hasHost() {
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
public String getHost() {
|
||||
java.lang.Object ref = host_;
|
||||
if (ref instanceof String) {
|
||||
return (String) ref;
|
||||
} else {
|
||||
com.google.protobuf.ByteString bs =
|
||||
(com.google.protobuf.ByteString) ref;
|
||||
String s = bs.toStringUtf8();
|
||||
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
|
||||
host_ = s;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
private com.google.protobuf.ByteString getHostBytes() {
|
||||
java.lang.Object ref = host_;
|
||||
if (ref instanceof String) {
|
||||
com.google.protobuf.ByteString b =
|
||||
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
|
||||
host_ = b;
|
||||
return b;
|
||||
} else {
|
||||
return (com.google.protobuf.ByteString) ref;
|
||||
}
|
||||
}
|
||||
|
||||
// required uint32 port = 2;
|
||||
public static final int PORT_FIELD_NUMBER = 2;
|
||||
private int port_;
|
||||
public boolean hasPort() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
public int getPort() {
|
||||
return port_;
|
||||
}
|
||||
|
||||
// required string path = 3;
|
||||
public static final int PATH_FIELD_NUMBER = 3;
|
||||
// required string path = 1;
|
||||
public static final int PATH_FIELD_NUMBER = 1;
|
||||
private java.lang.Object path_;
|
||||
public boolean hasPath() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
public String getPath() {
|
||||
java.lang.Object ref = path_;
|
||||
|
|
@ -2827,8 +2777,6 @@ public final class RemoteProtocol {
|
|||
}
|
||||
|
||||
private void initFields() {
|
||||
host_ = "";
|
||||
port_ = 0;
|
||||
path_ = "";
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
|
|
@ -2836,14 +2784,6 @@ public final class RemoteProtocol {
|
|||
byte isInitialized = memoizedIsInitialized;
|
||||
if (isInitialized != -1) return isInitialized == 1;
|
||||
|
||||
if (!hasHost()) {
|
||||
memoizedIsInitialized = 0;
|
||||
return false;
|
||||
}
|
||||
if (!hasPort()) {
|
||||
memoizedIsInitialized = 0;
|
||||
return false;
|
||||
}
|
||||
if (!hasPath()) {
|
||||
memoizedIsInitialized = 0;
|
||||
return false;
|
||||
|
|
@ -2856,13 +2796,7 @@ public final class RemoteProtocol {
|
|||
throws java.io.IOException {
|
||||
getSerializedSize();
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
output.writeBytes(1, getHostBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeUInt32(2, port_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeBytes(3, getPathBytes());
|
||||
output.writeBytes(1, getPathBytes());
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
|
@ -2875,15 +2809,7 @@ public final class RemoteProtocol {
|
|||
size = 0;
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(1, getHostBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(2, port_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(3, getPathBytes());
|
||||
.computeBytesSize(1, getPathBytes());
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
|
|
@ -3009,12 +2935,8 @@ public final class RemoteProtocol {
|
|||
|
||||
public Builder clear() {
|
||||
super.clear();
|
||||
host_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
port_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
path_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -3056,14 +2978,6 @@ public final class RemoteProtocol {
|
|||
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
to_bitField0_ |= 0x00000001;
|
||||
}
|
||||
result.host_ = host_;
|
||||
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.port_ = port_;
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.path_ = path_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
|
|
@ -3081,12 +2995,6 @@ public final class RemoteProtocol {
|
|||
|
||||
public Builder mergeFrom(akka.remote.RemoteProtocol.ActorRefProtocol other) {
|
||||
if (other == akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) return this;
|
||||
if (other.hasHost()) {
|
||||
setHost(other.getHost());
|
||||
}
|
||||
if (other.hasPort()) {
|
||||
setPort(other.getPort());
|
||||
}
|
||||
if (other.hasPath()) {
|
||||
setPath(other.getPath());
|
||||
}
|
||||
|
|
@ -3095,14 +3003,6 @@ public final class RemoteProtocol {
|
|||
}
|
||||
|
||||
public final boolean isInitialized() {
|
||||
if (!hasHost()) {
|
||||
|
||||
return false;
|
||||
}
|
||||
if (!hasPort()) {
|
||||
|
||||
return false;
|
||||
}
|
||||
if (!hasPath()) {
|
||||
|
||||
return false;
|
||||
|
|
@ -3135,16 +3035,6 @@ public final class RemoteProtocol {
|
|||
}
|
||||
case 10: {
|
||||
bitField0_ |= 0x00000001;
|
||||
host_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
case 16: {
|
||||
bitField0_ |= 0x00000002;
|
||||
port_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 26: {
|
||||
bitField0_ |= 0x00000004;
|
||||
path_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
|
|
@ -3154,67 +3044,10 @@ public final class RemoteProtocol {
|
|||
|
||||
private int bitField0_;
|
||||
|
||||
// required string host = 1;
|
||||
private java.lang.Object host_ = "";
|
||||
public boolean hasHost() {
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
public String getHost() {
|
||||
java.lang.Object ref = host_;
|
||||
if (!(ref instanceof String)) {
|
||||
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
|
||||
host_ = s;
|
||||
return s;
|
||||
} else {
|
||||
return (String) ref;
|
||||
}
|
||||
}
|
||||
public Builder setHost(String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
bitField0_ |= 0x00000001;
|
||||
host_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearHost() {
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
host_ = getDefaultInstance().getHost();
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
void setHost(com.google.protobuf.ByteString value) {
|
||||
bitField0_ |= 0x00000001;
|
||||
host_ = value;
|
||||
onChanged();
|
||||
}
|
||||
|
||||
// required uint32 port = 2;
|
||||
private int port_ ;
|
||||
public boolean hasPort() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
public int getPort() {
|
||||
return port_;
|
||||
}
|
||||
public Builder setPort(int value) {
|
||||
bitField0_ |= 0x00000002;
|
||||
port_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearPort() {
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
port_ = 0;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// required string path = 3;
|
||||
// required string path = 1;
|
||||
private java.lang.Object path_ = "";
|
||||
public boolean hasPath() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
public String getPath() {
|
||||
java.lang.Object ref = path_;
|
||||
|
|
@ -3230,19 +3063,19 @@ public final class RemoteProtocol {
|
|||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
bitField0_ |= 0x00000004;
|
||||
bitField0_ |= 0x00000001;
|
||||
path_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearPath() {
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
path_ = getDefaultInstance().getPath();
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
void setPath(com.google.protobuf.ByteString value) {
|
||||
bitField0_ |= 0x00000004;
|
||||
bitField0_ |= 0x00000001;
|
||||
path_ = value;
|
||||
onChanged();
|
||||
}
|
||||
|
|
@ -6864,35 +6697,34 @@ public final class RemoteProtocol {
|
|||
"\0132\026.MetadataEntryProtocol\"l\n\025RemoteContr" +
|
||||
"olProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comman",
|
||||
"dType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020." +
|
||||
"AddressProtocol\"<\n\020ActorRefProtocol\022\014\n\004h" +
|
||||
"ost\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\022\014\n\004path\030\003 \002(\t\";\n" +
|
||||
"\017MessageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017mes" +
|
||||
"sageManifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n\004hi" +
|
||||
"gh\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryPr" +
|
||||
"otocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n\017Ad" +
|
||||
"dressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030" +
|
||||
"\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030" +
|
||||
"\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!RemoteSystemD",
|
||||
"aemonMessageProtocol\0223\n\013messageType\030\001 \002(" +
|
||||
"\0162\036.RemoteSystemDaemonMessageType\022\021\n\tact" +
|
||||
"orPath\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replica" +
|
||||
"teActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035" +
|
||||
"DurableMailboxMessageProtocol\022$\n\trecipie" +
|
||||
"nt\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 " +
|
||||
"\001(\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*" +
|
||||
"(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020" +
|
||||
"\002*K\n\026ReplicationStorageType\022\r\n\tTRANSIENT" +
|
||||
"\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>",
|
||||
"\n\027ReplicationStrategyType\022\021\n\rWRITE_THROU" +
|
||||
"GH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemD" +
|
||||
"aemonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007R" +
|
||||
"ELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNA" +
|
||||
"VAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020" +
|
||||
"\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_C" +
|
||||
"ONNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021" +
|
||||
"FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG" +
|
||||
"_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013ak" +
|
||||
"ka.remoteH\001"
|
||||
"AddressProtocol\" \n\020ActorRefProtocol\022\014\n\004p" +
|
||||
"ath\030\001 \002(\t\";\n\017MessageProtocol\022\017\n\007message\030" +
|
||||
"\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014\")\n\014UuidPr" +
|
||||
"otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" +
|
||||
"adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" +
|
||||
"\030\002 \002(\014\"1\n\017AddressProtocol\022\020\n\010hostname\030\001 " +
|
||||
"\002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021" +
|
||||
"\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!R" +
|
||||
"emoteSystemDaemonMessageProtocol\0223\n\013mess",
|
||||
"ageType\030\001 \002(\0162\036.RemoteSystemDaemonMessag" +
|
||||
"eType\022\021\n\tactorPath\030\002 \001(\t\022\017\n\007payload\030\003 \001(" +
|
||||
"\014\022-\n\026replicateActorFromUuid\030\004 \001(\0132\r.Uuid" +
|
||||
"Protocol\"y\n\035DurableMailboxMessageProtoco" +
|
||||
"l\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProtocol\022" +
|
||||
"!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol\022\017\n\007me" +
|
||||
"ssage\030\003 \002(\014*(\n\013CommandType\022\013\n\007CONNECT\020\001\022" +
|
||||
"\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorageType\022" +
|
||||
"\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tD" +
|
||||
"ATA_GRID\020\003*>\n\027ReplicationStrategyType\022\021\n",
|
||||
"\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035R" +
|
||||
"emoteSystemDaemonMessageType\022\010\n\004STOP\020\001\022\007" +
|
||||
"\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004" +
|
||||
"\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r" +
|
||||
"\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n" +
|
||||
"\025FAIL_OVER_CONNECTIONS\020\024\022\026\n\022FUNCTION_FUN" +
|
||||
"0_UNIT\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCT" +
|
||||
"ION_FUN1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG" +
|
||||
"_ANY\020\030B\017\n\013akka.remoteH\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
@ -6928,7 +6760,7 @@ public final class RemoteProtocol {
|
|||
internal_static_ActorRefProtocol_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_ActorRefProtocol_descriptor,
|
||||
new java.lang.String[] { "Host", "Port", "Path", },
|
||||
new java.lang.String[] { "Path", },
|
||||
akka.remote.RemoteProtocol.ActorRefProtocol.class,
|
||||
akka.remote.RemoteProtocol.ActorRefProtocol.Builder.class);
|
||||
internal_static_MessageProtocol_descriptor =
|
||||
|
|
|
|||
|
|
@ -66,9 +66,7 @@ enum ReplicationStrategyType {
|
|||
* on the original node.
|
||||
*/
|
||||
message ActorRefProtocol {
|
||||
required string host = 1;
|
||||
required uint32 port = 2;
|
||||
required string path = 3;
|
||||
required string path = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -249,11 +249,8 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
|
|||
val provider = remote.system.asInstanceOf[ActorSystemImpl].provider
|
||||
|
||||
lazy val sender: ActorRef =
|
||||
if (input.hasSender)
|
||||
provider.deserialize(
|
||||
SerializedActorRef(input.getSender.getHost, input.getSender.getPort, input.getSender.getPath)).getOrElse(throw new IllegalStateException("OHNOES"))
|
||||
else
|
||||
remote.system.deadLetters
|
||||
if (input.hasSender) provider.actorFor(input.getSender.getPath)
|
||||
else remote.system.deadLetters
|
||||
|
||||
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath)
|
||||
|
||||
|
|
@ -302,8 +299,7 @@ trait RemoteMarshallingOps {
|
|||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||
*/
|
||||
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = {
|
||||
val rep = system.asInstanceOf[ActorSystemImpl].provider.serialize(actor)
|
||||
ActorRefProtocol.newBuilder.setHost(rep.hostname).setPort(rep.port).setPath(rep.path).build
|
||||
ActorRefProtocol.newBuilder.setPath(actor.path.toString).build
|
||||
}
|
||||
|
||||
def createRemoteMessageProtocolBuilder(
|
||||
|
|
|
|||
|
|
@ -193,21 +193,6 @@ class RemoteActorRefProvider(
|
|||
*/
|
||||
private[akka] def evict(path: ActorPath): Boolean = actors.remove(path) ne null
|
||||
|
||||
private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match {
|
||||
case r: RemoteActorRef ⇒ new SerializedActorRef(r.remoteAddress, actor.path.toString)
|
||||
case other ⇒ local.serialize(actor)
|
||||
}
|
||||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
|
||||
val remoteAddress = RemoteAddress(systemName, actor.hostname, actor.port)
|
||||
if (optimizeLocalScoped_? && remoteAddress == remote.remoteAddress) {
|
||||
Some(local.actorFor(actor.path))
|
||||
} else {
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", remote.remoteAddress, actor.path, remoteAddress)
|
||||
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / actor.path, None)) // FIXME I know, this is broken
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
*/
|
||||
|
|
@ -226,7 +211,7 @@ class RemoteActorRefProvider(
|
|||
.setPayload(ByteString.copyFrom(actorFactoryBytes))
|
||||
.build()
|
||||
|
||||
val connectionFactory = () ⇒ deserialize(new SerializedActorRef(remoteAddress, remote.remoteDaemon.path.toString)).get
|
||||
val connectionFactory = () ⇒ actorFor(RootActorPath(remoteAddress) / remote.remoteDaemon.path.pathElements)
|
||||
|
||||
// try to get the connection for the remote address, if not already there then create it
|
||||
val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory)
|
||||
|
|
@ -311,7 +296,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
}
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = provider.serialize(this)
|
||||
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
|
||||
|
||||
def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
akka {
|
||||
qakka {
|
||||
loglevel = "WARNING"
|
||||
actor {
|
||||
provider = "akka.remote.RemoteActorRefProvider"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue