diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index bcb9e872e0..739eee6234 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -488,6 +488,7 @@ class LocalActorRefProvider( // chain death watchers so that killing guardian stops the application deathWatch.subscribe(systemGuardian, guardian) deathWatch.subscribe(rootGuardian, systemGuardian) + eventStream.startDefaultLoggers(_system) } def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 31caf6083b..4c94b53c82 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -418,10 +418,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def /(path: Iterable[String]): ActorPath = guardian.path / path private lazy val _start: this.type = { + // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) deadLetters.init(dispatcher, provider.rootPath) // this starts the reaper actor and the user-configured logging subscribers, which are also actors - eventStream.startDefaultLoggers(this) registerOnTermination(stopScheduler()) loadExtensions() if (LogConfigOnStart) logConfiguration() diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ed88921e16..4cea1871b5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -53,6 +53,7 @@ object SystemMessage { * ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ */ sealed trait SystemMessage extends PossiblyHarmful { + @transient var next: SystemMessage = _ } case class Create() extends SystemMessage // send to self from Dispatcher.register diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 7f10eb6987..67e260eef9 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -922,16 +922,11 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.ActorRefProtocol getRecipient(); akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getRecipientOrBuilder(); - // optional .MessageProtocol message = 2; + // required .MessageProtocol message = 2; boolean hasMessage(); akka.remote.RemoteProtocol.MessageProtocol getMessage(); akka.remote.RemoteProtocol.MessageProtocolOrBuilder getMessageOrBuilder(); - // optional .ExceptionProtocol exception = 3; - boolean hasException(); - akka.remote.RemoteProtocol.ExceptionProtocol getException(); - akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder getExceptionOrBuilder(); - // optional .ActorRefProtocol sender = 4; boolean hasSender(); akka.remote.RemoteProtocol.ActorRefProtocol getSender(); @@ -989,7 +984,7 @@ public final class RemoteProtocol { return recipient_; } - // optional .MessageProtocol message = 2; + // required .MessageProtocol message = 2; public static final int MESSAGE_FIELD_NUMBER = 2; private akka.remote.RemoteProtocol.MessageProtocol message_; public boolean hasMessage() { @@ -1002,24 +997,11 @@ public final class RemoteProtocol { return message_; } - // optional .ExceptionProtocol exception = 3; - public static final int EXCEPTION_FIELD_NUMBER = 3; - private akka.remote.RemoteProtocol.ExceptionProtocol exception_; - public boolean hasException() { - return ((bitField0_ & 0x00000004) == 0x00000004); - } - public akka.remote.RemoteProtocol.ExceptionProtocol getException() { - return exception_; - } - public akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder getExceptionOrBuilder() { - return exception_; - } - // optional .ActorRefProtocol sender = 4; public static final int SENDER_FIELD_NUMBER = 4; private akka.remote.RemoteProtocol.ActorRefProtocol sender_; public boolean hasSender() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public akka.remote.RemoteProtocol.ActorRefProtocol getSender() { return sender_; @@ -1052,7 +1034,6 @@ public final class RemoteProtocol { private void initFields() { recipient_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); message_ = akka.remote.RemoteProtocol.MessageProtocol.getDefaultInstance(); - exception_ = akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance(); sender_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); metadata_ = java.util.Collections.emptyList(); } @@ -1065,21 +1046,17 @@ public final class RemoteProtocol { memoizedIsInitialized = 0; return false; } + if (!hasMessage()) { + memoizedIsInitialized = 0; + return false; + } if (!getRecipient().isInitialized()) { memoizedIsInitialized = 0; return false; } - if (hasMessage()) { - if (!getMessage().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - } - if (hasException()) { - if (!getException().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } + if (!getMessage().isInitialized()) { + memoizedIsInitialized = 0; + return false; } if (hasSender()) { if (!getSender().isInitialized()) { @@ -1107,9 +1084,6 @@ public final class RemoteProtocol { output.writeMessage(2, message_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeMessage(3, exception_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, sender_); } for (int i = 0; i < metadata_.size(); i++) { @@ -1133,10 +1107,6 @@ public final class RemoteProtocol { .computeMessageSize(2, message_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(3, exception_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, sender_); } @@ -1262,7 +1232,6 @@ public final class RemoteProtocol { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getRecipientFieldBuilder(); getMessageFieldBuilder(); - getExceptionFieldBuilder(); getSenderFieldBuilder(); getMetadataFieldBuilder(); } @@ -1285,21 +1254,15 @@ public final class RemoteProtocol { messageBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); - if (exceptionBuilder_ == null) { - exception_ = akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance(); - } else { - exceptionBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000004); if (senderBuilder_ == null) { sender_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); } else { senderBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); if (metadataBuilder_ == null) { metadata_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } else { metadataBuilder_.clear(); } @@ -1360,23 +1323,15 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - if (exceptionBuilder_ == null) { - result.exception_ = exception_; - } else { - result.exception_ = exceptionBuilder_.build(); - } - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } if (senderBuilder_ == null) { result.sender_ = sender_; } else { result.sender_ = senderBuilder_.build(); } if (metadataBuilder_ == null) { - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { metadata_ = java.util.Collections.unmodifiableList(metadata_); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } result.metadata_ = metadata_; } else { @@ -1404,9 +1359,6 @@ public final class RemoteProtocol { if (other.hasMessage()) { mergeMessage(other.getMessage()); } - if (other.hasException()) { - mergeException(other.getException()); - } if (other.hasSender()) { mergeSender(other.getSender()); } @@ -1414,7 +1366,7 @@ public final class RemoteProtocol { if (!other.metadata_.isEmpty()) { if (metadata_.isEmpty()) { metadata_ = other.metadata_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } else { ensureMetadataIsMutable(); metadata_.addAll(other.metadata_); @@ -1427,7 +1379,7 @@ public final class RemoteProtocol { metadataBuilder_.dispose(); metadataBuilder_ = null; metadata_ = other.metadata_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); metadataBuilder_ = com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? getMetadataFieldBuilder() : null; @@ -1445,21 +1397,17 @@ public final class RemoteProtocol { return false; } + if (!hasMessage()) { + + return false; + } if (!getRecipient().isInitialized()) { return false; } - if (hasMessage()) { - if (!getMessage().isInitialized()) { - - return false; - } - } - if (hasException()) { - if (!getException().isInitialized()) { - - return false; - } + if (!getMessage().isInitialized()) { + + return false; } if (hasSender()) { if (!getSender().isInitialized()) { @@ -1517,15 +1465,6 @@ public final class RemoteProtocol { setMessage(subBuilder.buildPartial()); break; } - case 26: { - akka.remote.RemoteProtocol.ExceptionProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ExceptionProtocol.newBuilder(); - if (hasException()) { - subBuilder.mergeFrom(getException()); - } - input.readMessage(subBuilder, extensionRegistry); - setException(subBuilder.buildPartial()); - break; - } case 34: { akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(); if (hasSender()) { @@ -1637,7 +1576,7 @@ public final class RemoteProtocol { return recipientBuilder_; } - // optional .MessageProtocol message = 2; + // required .MessageProtocol message = 2; private akka.remote.RemoteProtocol.MessageProtocol message_ = akka.remote.RemoteProtocol.MessageProtocol.getDefaultInstance(); private com.google.protobuf.SingleFieldBuilder< akka.remote.RemoteProtocol.MessageProtocol, akka.remote.RemoteProtocol.MessageProtocol.Builder, akka.remote.RemoteProtocol.MessageProtocolOrBuilder> messageBuilder_; @@ -1727,102 +1666,12 @@ public final class RemoteProtocol { return messageBuilder_; } - // optional .ExceptionProtocol exception = 3; - private akka.remote.RemoteProtocol.ExceptionProtocol exception_ = akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ExceptionProtocol, akka.remote.RemoteProtocol.ExceptionProtocol.Builder, akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder> exceptionBuilder_; - public boolean hasException() { - return ((bitField0_ & 0x00000004) == 0x00000004); - } - public akka.remote.RemoteProtocol.ExceptionProtocol getException() { - if (exceptionBuilder_ == null) { - return exception_; - } else { - return exceptionBuilder_.getMessage(); - } - } - public Builder setException(akka.remote.RemoteProtocol.ExceptionProtocol value) { - if (exceptionBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - exception_ = value; - onChanged(); - } else { - exceptionBuilder_.setMessage(value); - } - bitField0_ |= 0x00000004; - return this; - } - public Builder setException( - akka.remote.RemoteProtocol.ExceptionProtocol.Builder builderForValue) { - if (exceptionBuilder_ == null) { - exception_ = builderForValue.build(); - onChanged(); - } else { - exceptionBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000004; - return this; - } - public Builder mergeException(akka.remote.RemoteProtocol.ExceptionProtocol value) { - if (exceptionBuilder_ == null) { - if (((bitField0_ & 0x00000004) == 0x00000004) && - exception_ != akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance()) { - exception_ = - akka.remote.RemoteProtocol.ExceptionProtocol.newBuilder(exception_).mergeFrom(value).buildPartial(); - } else { - exception_ = value; - } - onChanged(); - } else { - exceptionBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000004; - return this; - } - public Builder clearException() { - if (exceptionBuilder_ == null) { - exception_ = akka.remote.RemoteProtocol.ExceptionProtocol.getDefaultInstance(); - onChanged(); - } else { - exceptionBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000004); - return this; - } - public akka.remote.RemoteProtocol.ExceptionProtocol.Builder getExceptionBuilder() { - bitField0_ |= 0x00000004; - onChanged(); - return getExceptionFieldBuilder().getBuilder(); - } - public akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder getExceptionOrBuilder() { - if (exceptionBuilder_ != null) { - return exceptionBuilder_.getMessageOrBuilder(); - } else { - return exception_; - } - } - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ExceptionProtocol, akka.remote.RemoteProtocol.ExceptionProtocol.Builder, akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder> - getExceptionFieldBuilder() { - if (exceptionBuilder_ == null) { - exceptionBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ExceptionProtocol, akka.remote.RemoteProtocol.ExceptionProtocol.Builder, akka.remote.RemoteProtocol.ExceptionProtocolOrBuilder>( - exception_, - getParentForChildren(), - isClean()); - exception_ = null; - } - return exceptionBuilder_; - } - // optional .ActorRefProtocol sender = 4; private akka.remote.RemoteProtocol.ActorRefProtocol sender_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); private com.google.protobuf.SingleFieldBuilder< akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> senderBuilder_; public boolean hasSender() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public akka.remote.RemoteProtocol.ActorRefProtocol getSender() { if (senderBuilder_ == null) { @@ -1841,7 +1690,7 @@ public final class RemoteProtocol { } else { senderBuilder_.setMessage(value); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; return this; } public Builder setSender( @@ -1852,12 +1701,12 @@ public final class RemoteProtocol { } else { senderBuilder_.setMessage(builderForValue.build()); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; return this; } public Builder mergeSender(akka.remote.RemoteProtocol.ActorRefProtocol value) { if (senderBuilder_ == null) { - if (((bitField0_ & 0x00000008) == 0x00000008) && + if (((bitField0_ & 0x00000004) == 0x00000004) && sender_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { sender_ = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(sender_).mergeFrom(value).buildPartial(); @@ -1868,7 +1717,7 @@ public final class RemoteProtocol { } else { senderBuilder_.mergeFrom(value); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; return this; } public Builder clearSender() { @@ -1878,11 +1727,11 @@ public final class RemoteProtocol { } else { senderBuilder_.clear(); } - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); return this; } public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getSenderBuilder() { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; onChanged(); return getSenderFieldBuilder().getBuilder(); } @@ -1911,9 +1760,9 @@ public final class RemoteProtocol { private java.util.List metadata_ = java.util.Collections.emptyList(); private void ensureMetadataIsMutable() { - if (!((bitField0_ & 0x00000010) == 0x00000010)) { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { metadata_ = new java.util.ArrayList(metadata_); - bitField0_ |= 0x00000010; + bitField0_ |= 0x00000008; } } @@ -2029,7 +1878,7 @@ public final class RemoteProtocol { public Builder clearMetadata() { if (metadataBuilder_ == null) { metadata_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); onChanged(); } else { metadataBuilder_.clear(); @@ -2085,7 +1934,7 @@ public final class RemoteProtocol { metadataBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< akka.remote.RemoteProtocol.MetadataEntryProtocol, akka.remote.RemoteProtocol.MetadataEntryProtocol.Builder, akka.remote.RemoteProtocol.MetadataEntryProtocolOrBuilder>( metadata_, - ((bitField0_ & 0x00000010) == 0x00000010), + ((bitField0_ & 0x00000008) == 0x00000008), getParentForChildren(), isClean()); metadata_ = null; @@ -4365,11 +4214,15 @@ public final class RemoteProtocol { public interface AddressProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string hostname = 1; + // required string system = 1; + boolean hasSystem(); + String getSystem(); + + // required string hostname = 2; boolean hasHostname(); String getHostname(); - // required uint32 port = 2; + // required uint32 port = 3; boolean hasPort(); int getPort(); } @@ -4402,11 +4255,43 @@ public final class RemoteProtocol { } private int bitField0_; - // required string hostname = 1; - public static final int HOSTNAME_FIELD_NUMBER = 1; + // required string system = 1; + public static final int SYSTEM_FIELD_NUMBER = 1; + private java.lang.Object system_; + public boolean hasSystem() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getSystem() { + java.lang.Object ref = system_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + system_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getSystemBytes() { + java.lang.Object ref = system_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + system_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required string hostname = 2; + public static final int HOSTNAME_FIELD_NUMBER = 2; private java.lang.Object hostname_; public boolean hasHostname() { - return ((bitField0_ & 0x00000001) == 0x00000001); + return ((bitField0_ & 0x00000002) == 0x00000002); } public String getHostname() { java.lang.Object ref = hostname_; @@ -4434,17 +4319,18 @@ public final class RemoteProtocol { } } - // required uint32 port = 2; - public static final int PORT_FIELD_NUMBER = 2; + // required uint32 port = 3; + public static final int PORT_FIELD_NUMBER = 3; private int port_; public boolean hasPort() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public int getPort() { return port_; } private void initFields() { + system_ = ""; hostname_ = ""; port_ = 0; } @@ -4453,6 +4339,10 @@ public final class RemoteProtocol { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; + if (!hasSystem()) { + memoizedIsInitialized = 0; + return false; + } if (!hasHostname()) { memoizedIsInitialized = 0; return false; @@ -4469,10 +4359,13 @@ public final class RemoteProtocol { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getHostnameBytes()); + output.writeBytes(1, getSystemBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt32(2, port_); + output.writeBytes(2, getHostnameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt32(3, port_); } getUnknownFields().writeTo(output); } @@ -4485,11 +4378,15 @@ public final class RemoteProtocol { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getHostnameBytes()); + .computeBytesSize(1, getSystemBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(2, port_); + .computeBytesSize(2, getHostnameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(3, port_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -4601,7 +4498,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -4615,10 +4512,12 @@ public final class RemoteProtocol { public Builder clear() { super.clear(); - hostname_ = ""; + system_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - port_ = 0; + hostname_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + port_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -4660,10 +4559,14 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.hostname_ = hostname_; + result.system_ = system_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } + result.hostname_ = hostname_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } result.port_ = port_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -4681,6 +4584,9 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.RemoteProtocol.AddressProtocol other) { if (other == akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance()) return this; + if (other.hasSystem()) { + setSystem(other.getSystem()); + } if (other.hasHostname()) { setHostname(other.getHostname()); } @@ -4692,6 +4598,10 @@ public final class RemoteProtocol { } public final boolean isInitialized() { + if (!hasSystem()) { + + return false; + } if (!hasHostname()) { return false; @@ -4728,11 +4638,16 @@ public final class RemoteProtocol { } case 10: { bitField0_ |= 0x00000001; + system_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; hostname_ = input.readBytes(); break; } - case 16: { - bitField0_ |= 0x00000002; + case 24: { + bitField0_ |= 0x00000004; port_ = input.readUInt32(); break; } @@ -4742,10 +4657,46 @@ public final class RemoteProtocol { private int bitField0_; - // required string hostname = 1; + // required string system = 1; + private java.lang.Object system_ = ""; + public boolean hasSystem() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getSystem() { + java.lang.Object ref = system_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + system_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setSystem(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + system_ = value; + onChanged(); + return this; + } + public Builder clearSystem() { + bitField0_ = (bitField0_ & ~0x00000001); + system_ = getDefaultInstance().getSystem(); + onChanged(); + return this; + } + void setSystem(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + system_ = value; + onChanged(); + } + + // required string hostname = 2; private java.lang.Object hostname_ = ""; public boolean hasHostname() { - return ((bitField0_ & 0x00000001) == 0x00000001); + return ((bitField0_ & 0x00000002) == 0x00000002); } public String getHostname() { java.lang.Object ref = hostname_; @@ -4761,39 +4712,39 @@ public final class RemoteProtocol { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000001; + bitField0_ |= 0x00000002; hostname_ = value; onChanged(); return this; } public Builder clearHostname() { - bitField0_ = (bitField0_ & ~0x00000001); + bitField0_ = (bitField0_ & ~0x00000002); hostname_ = getDefaultInstance().getHostname(); onChanged(); return this; } void setHostname(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000001; + bitField0_ |= 0x00000002; hostname_ = value; onChanged(); } - // required uint32 port = 2; + // required uint32 port = 3; private int port_ ; public boolean hasPort() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public int getPort() { return port_; } public Builder setPort(int value) { - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000004; port_ = value; onChanged(); return this; } public Builder clearPort() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000004); port_ = 0; onChanged(); return this; @@ -5071,7 +5022,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -5584,7 +5535,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -6216,7 +6167,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -6686,45 +6637,45 @@ public final class RemoteProtocol { descriptor; static { java.lang.String[] descriptorData = { - "\n\024RemoteProtocol.proto\"j\n\022AkkaRemoteProt" + - "ocol\022\'\n\007message\030\001 \001(\0132\026.RemoteMessagePro" + - "tocol\022+\n\013instruction\030\002 \001(\0132\026.RemoteContr" + - "olProtocol\"\324\001\n\025RemoteMessageProtocol\022$\n\t" + - "recipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\007me" + - "ssage\030\002 \001(\0132\020.MessageProtocol\022%\n\texcepti" + - "on\030\003 \001(\0132\022.ExceptionProtocol\022!\n\006sender\030\004" + - " \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003(" + - "\0132\026.MetadataEntryProtocol\"l\n\025RemoteContr" + - "olProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comman", - "dType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020." + - "AddressProtocol\" \n\020ActorRefProtocol\022\014\n\004p" + - "ath\030\001 \002(\t\";\n\017MessageProtocol\022\017\n\007message\030" + - "\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014\")\n\014UuidPr" + - "otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" + - "adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" + - "\030\002 \002(\014\"1\n\017AddressProtocol\022\020\n\010hostname\030\001 " + - "\002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021" + - "\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!R" + - "emoteSystemDaemonMessageProtocol\0223\n\013mess", - "ageType\030\001 \002(\0162\036.RemoteSystemDaemonMessag" + - "eType\022\021\n\tactorPath\030\002 \001(\t\022\017\n\007payload\030\003 \001(" + - "\014\022-\n\026replicateActorFromUuid\030\004 \001(\0132\r.Uuid" + - "Protocol\"y\n\035DurableMailboxMessageProtoco" + - "l\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProtocol\022" + - "!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol\022\017\n\007me" + - "ssage\030\003 \002(\014*(\n\013CommandType\022\013\n\007CONNECT\020\001\022" + - "\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorageType\022" + - "\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tD" + - "ATA_GRID\020\003*>\n\027ReplicationStrategyType\022\021\n", - "\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035R" + - "emoteSystemDaemonMessageType\022\010\n\004STOP\020\001\022\007" + - "\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004" + - "\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r" + - "\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n" + - "\025FAIL_OVER_CONNECTIONS\020\024\022\026\n\022FUNCTION_FUN" + - "0_UNIT\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCT" + - "ION_FUN1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG" + - "_ANY\020\030B\017\n\013akka.remoteH\001" + "\n\035protocol/RemoteProtocol.proto\"j\n\022AkkaR" + + "emoteProtocol\022\'\n\007message\030\001 \001(\0132\026.RemoteM" + + "essageProtocol\022+\n\013instruction\030\002 \001(\0132\026.Re" + + "moteControlProtocol\"\255\001\n\025RemoteMessagePro" + + "tocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProto" + + "col\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022!" + + "\n\006sender\030\004 \001(\0132\021.ActorRefProtocol\022(\n\010met" + + "adata\030\005 \003(\0132\026.MetadataEntryProtocol\"l\n\025R" + + "emoteControlProtocol\022!\n\013commandType\030\001 \002(" + + "\0162\014.CommandType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origi", + "n\030\003 \001(\0132\020.AddressProtocol\" \n\020ActorRefPro" + + "tocol\022\014\n\004path\030\001 \002(\t\";\n\017MessageProtocol\022\017" + + "\n\007message\030\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014" + + "\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 " + + "\002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(" + + "\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006s" + + "ystem\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 " + + "\002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 " + + "\002(\t\022\017\n\007message\030\002 \002(\t\"\253\001\n!RemoteSystemDae" + + "monMessageProtocol\0223\n\013messageType\030\001 \002(\0162", + "\036.RemoteSystemDaemonMessageType\022\021\n\tactor" + + "Path\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replicate" + + "ActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035Du" + + "rableMailboxMessageProtocol\022$\n\trecipient" + + "\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(" + + "\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(\n" + + "\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*" + + "K\n\026ReplicationStorageType\022\r\n\tTRANSIENT\020\001" + + "\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027" + + "ReplicationStrategyType\022\021\n\rWRITE_THROUGH", + "\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemDae" + + "monMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007REL" + + "EASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNAVA" + + "ILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020\007\022" + + "\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_CON" + + "NECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021FU" + + "NCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG_U" + + "NIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013akka" + + ".remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6744,7 +6695,7 @@ public final class RemoteProtocol { internal_static_RemoteMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteMessageProtocol_descriptor, - new java.lang.String[] { "Recipient", "Message", "Exception", "Sender", "Metadata", }, + new java.lang.String[] { "Recipient", "Message", "Sender", "Metadata", }, akka.remote.RemoteProtocol.RemoteMessageProtocol.class, akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder.class); internal_static_RemoteControlProtocol_descriptor = @@ -6792,7 +6743,7 @@ public final class RemoteProtocol { internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, - new java.lang.String[] { "Hostname", "Port", }, + new java.lang.String[] { "System", "Hostname", "Port", }, akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 6fdc9acaaf..efd605a108 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -21,8 +21,7 @@ message AkkaRemoteProtocol { */ message RemoteMessageProtocol { required ActorRefProtocol recipient = 1; - optional MessageProtocol message = 2; - optional ExceptionProtocol exception = 3; + required MessageProtocol message = 2; optional ActorRefProtocol sender = 4; repeated MetadataEntryProtocol metadata = 5; } @@ -97,8 +96,9 @@ message MetadataEntryProtocol { * Defines a remote address. */ message AddressProtocol { - required string hostname = 1; - required uint32 port = 2; + required string system = 1; + required string hostname = 2; + required uint32 port = 3; } /** diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 87dda83b71..f2ebdb0cc0 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -25,12 +25,6 @@ import akka.actor.ActorSystem */ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000) { - def this(system: ActorSystem) { - this( - RemoteExtension(system).FailureDetectorThreshold, - RemoteExtension(system).FailureDetectorMaxSampleSize) - } - private final val PhiFactor = 1.0 / math.log(10.0) private case class FailureStats(mean: Double = 0.0D, variance: Double = 0.0D, deviation: Double = 0.0D) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index cc52403f9f..3335656e14 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -106,14 +106,14 @@ class Gossiper(remote: Remote) { nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener]) private val system = remote.system - private val remoteExtension = RemoteExtension(system) + private val remoteSettings = remote.remoteSettings private val serialization = SerializationExtension(system) private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = { - val seeds = remoteExtension.SeedNodes + val seeds = remoteSettings.SeedNodes if (seeds.isEmpty) throw new ConfigurationException( "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") else seeds @@ -123,8 +123,8 @@ class Gossiper(remote: Remote) { private val nodeFingerprint = address.## private val random = SecureRandom.getInstance("SHA1PRNG") - private val initalDelayForGossip = remoteExtension.InitalDelayForGossip - private val gossipFrequency = remoteExtension.GossipFrequency + private val initalDelayForGossip = remoteSettings.InitalDelayForGossip + private val gossipFrequency = remoteSettings.GossipFrequency private val state = new AtomicReference[State](State(currentGossip = newGossip())) @@ -245,7 +245,7 @@ class Gossiper(remote: Remote) { throw new IllegalStateException("Connection for [" + peer + "] is not set up")) try { - (connection ? (toRemoteMessage(newGossip), remoteExtension.RemoteSystemDaemonAckTimeout)).as[Status] match { + (connection ? (toRemoteMessage(newGossip), remoteSettings.RemoteSystemDaemonAckTimeout)).as[Status] match { case Some(Success(receiver)) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 4bf96bd823..bfbe3aa658 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -22,26 +22,27 @@ import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.TimeUnit.MILLISECONDS import akka.serialization.SerializationExtension +import akka.dispatch.SystemMessage +import akka.event.LoggingAdapter /** * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. * * @author Jonas Bonér */ -class Remote(val system: ActorSystemImpl, val nodename: String) { +class Remote(val system: ActorSystemImpl, val nodename: String, val remoteSettings: RemoteSettings) { val log = Logging(system, "Remote") import system._ import settings._ - private[remote] val remoteExtension = RemoteExtension(system) private[remote] val serialization = SerializationExtension(system) private[remote] val remoteAddress = { - RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) + RemoteAddress(system.name, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port) } - val failureDetector = new AccrualFailureDetector(system) + val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) // val gossiper = new Gossiper(this) @@ -50,17 +51,21 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher") // FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408 - private[remote] lazy val remoteDaemonSupervisor = system.actorOf(Props( - OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want? + val remoteDaemonSupervisor = + system.provider.actorOf(system, + Props(OneForOneStrategy(List(classOf[Exception]), None, None)), + system.provider.rootGuardian, + "akka-system-remote-supervisor", + systemService = true) // is infinite restart what we want? - private[remote] lazy val remoteDaemon = + val remoteDaemon = system.provider.actorOf(system, Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)), - remoteDaemonSupervisor.asInstanceOf[InternalActorRef], + remoteDaemonSupervisor, remoteDaemonServiceName, systemService = true) - private[remote] lazy val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor { + val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor { def receive = { case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) @@ -68,16 +73,16 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { } }), "akka.remote.RemoteClientLifeCycleListener") - lazy val eventStream = new NetworkEventStream(system) + val eventStream = new NetworkEventStream(system) - lazy val server: RemoteSupport = { + val server: RemoteSupport = { val arguments = Seq( classOf[ActorSystem] -> system, classOf[Remote] -> this) val types: Array[Class[_]] = arguments map (_._1) toArray val values: Array[AnyRef] = arguments map (_._2) toArray - ReflectiveAccess.createInstance[RemoteSupport](remoteExtension.RemoteTransport, types, values) match { + ReflectiveAccess.createInstance[RemoteSupport](remoteSettings.RemoteTransport, types, values) match { case Left(problem) ⇒ log.error(problem, "Could not load remote transport layer") throw problem @@ -91,10 +96,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { } } - def start() { - val daemonPath = remoteDaemon.path //Force init of daemon - log.info("Starting remote server on [{}] and starting remoteDaemon with path [{}]", remoteAddress, daemonPath) - } + log.info("Starting remote server on [{}] and starting remoteDaemon {}", remoteAddress, remoteDaemon) } /** @@ -141,7 +143,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { if (message.hasActorPath) { val actorFactoryBytes = - if (remoteExtension.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray + if (remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray val actorFactory = serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { @@ -254,39 +256,23 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) { - val provider = remote.system.asInstanceOf[ActorSystemImpl].provider + def provider = remote.system.asInstanceOf[ActorSystemImpl].provider lazy val sender: ActorRef = if (input.hasSender) provider.actorFor(provider.rootGuardian, input.getSender.getPath) else remote.system.deadLetters - lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath) + lazy val recipient: InternalActorRef = provider.actorFor(provider.rootGuardian, input.getRecipient.getPath) - lazy val payload: Either[Throwable, AnyRef] = - if (input.hasException) Left(parseException()) - else Right(MessageSerializer.deserialize(remote.system, input.getMessage, classLoader)) + lazy val payload: AnyRef = MessageSerializer.deserialize(remote.system, input.getMessage, classLoader) - protected def parseException(): Throwable = { - val exception = input.getException - val classname = exception.getClassname - try { - val exceptionClass = - if (classLoader.isDefined) classLoader.get.loadClass(classname) else Class.forName(classname) - exceptionClass - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exception.getMessage).asInstanceOf[Throwable] - } catch { - case problem: Exception ⇒ - remote.system.eventStream.publish(Logging.Error(problem, "RemoteMessage", problem.getMessage)) - CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) - } - } - - override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getPath + ") from " + sender + override def toString = "RemoteMessage: " + payload + " to " + recipient + " from " + sender } trait RemoteMarshallingOps { + def log: LoggingAdapter + def system: ActorSystem protected def useUntrustedMode: Boolean @@ -311,21 +297,12 @@ trait RemoteMarshallingOps { } def createRemoteMessageProtocolBuilder( - recipient: Either[ActorRef, ActorRefProtocol], - message: Either[Throwable, Any], + recipient: ActorRef, + message: Any, senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { - val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(recipient.fold(toRemoteActorRefProtocol _, identity)) - - message match { - case Right(message) ⇒ - messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) - case Left(exception) ⇒ - messageBuilder.setException(ExceptionProtocol.newBuilder - .setClassname(exception.getClass.getName) - .setMessage(Option(exception.getMessage).getOrElse("")) - .build) - } + val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) + messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) @@ -333,15 +310,20 @@ trait RemoteMarshallingOps { } def receiveMessage(remoteMessage: RemoteMessage) { - val recipient = remoteMessage.recipient + log.debug("received message {}", remoteMessage) - remoteMessage.payload match { - case Left(t) ⇒ throw t - case Right(r) ⇒ r match { - case _: Terminate ⇒ if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() - case _: AutoReceivedMessage if (useUntrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - case m ⇒ recipient.!(m)(remoteMessage.sender) - } + remoteMessage.recipient match { + case l @ (_: LocalActorRef | _: MinimalActorRef) ⇒ + remoteMessage.payload match { + case msg: SystemMessage ⇒ + if (useUntrustedMode) + throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message") + else l.sendSystemMessage(msg) + case _: AutoReceivedMessage if (useUntrustedMode) ⇒ + throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ l.!(m)(remoteMessage.sender) + } + case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f4954e919f..44e183ece1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise import java.net.InetAddress import akka.serialization.SerializationExtension +import akka.serialization.Serialization /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -38,41 +39,31 @@ class RemoteActorRefProvider( val log = Logging(eventStream, "RemoteActorRefProvider") + val remoteSettings = new RemoteSettings(settings.config, systemName) + def deathWatch = local.deathWatch def rootGuardian = local.rootGuardian def guardian = local.guardian def systemGuardian = local.systemGuardian - def nodename = remoteExtension.NodeName - def clustername = remoteExtension.ClusterName + def nodename = remoteSettings.NodeName + def clustername = remoteSettings.ClusterName private val actors = new ConcurrentHashMap[String, AnyRef] - /* - * The problem is that ActorRefs need a reference to the ActorSystem to - * provide their service. Hence they cannot be created while the - * constructors of ActorSystem and ActorRefProvider are still running. - * The solution is to split out that last part into an init() method, - * but it also requires these references to be @volatile and lazy. - */ - @volatile - private var system: ActorSystemImpl = _ - private lazy val remoteExtension = RemoteExtension(system) - private lazy val serialization = SerializationExtension(system) - lazy val rootPath: ActorPath = { - val remoteAddress = RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) - new RootActorPath(remoteAddress) - } - private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath) - private[akka] lazy val remote = { - val r = new Remote(system, nodename) - terminationFuture.onComplete(_ ⇒ r.server.shutdown()) - r - } - private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) + val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)) + private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath) + private var serialization: Serialization = _ + private var remoteDaemonConnectionManager: RemoteConnectionManager = _ - def init(_system: ActorSystemImpl) { - system = _system - local.init(_system) + private var _remote: Remote = _ + def remote = _remote + + def init(system: ActorSystemImpl) { + local.init(system) + serialization = SerializationExtension(system) + _remote = new Remote(system, nodename, remoteSettings) + remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) + terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } private[akka] def terminationFuture = local.terminationFuture @@ -215,7 +206,7 @@ class RemoteActorRefProvider( val actorFactoryBytes = serialization.serialize(actorFactory) match { case Left(error) ⇒ throw error - case Right(bytes) ⇒ if (remoteExtension.ShouldCompressData) LZF.compress(bytes) else bytes + case Right(bytes) ⇒ if (remoteSettings.ShouldCompressData) LZF.compress(bytes) else bytes } val command = RemoteSystemDaemonMessageProtocol.newBuilder @@ -235,7 +226,7 @@ class RemoteActorRefProvider( private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) { if (withACK) { try { - val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout) + val f = connection ? (command, remoteSettings.RemoteSystemDaemonAckTimeout) (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match { case Some(Right(receiver)) ⇒ log.debug("Remote system command sent to [{}] successfully received", receiver) @@ -286,27 +277,20 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef") + def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this, loader) override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout) - def suspend(): Unit = () + def suspend(): Unit = sendSystemMessage(Suspend()) - def resume(): Unit = () + def resume(): Unit = sendSystemMessage(Resume()) - def stop() { - synchronized { - if (running) { - running = false - remote.send(new Terminate(), None, this, loader) - } - } - } + def stop(): Unit = sendSystemMessage(Terminate()) + + def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = SerializedActorRef(path.toString) - - def restart(cause: Throwable): Unit = () } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 33ba83dd73..7967813575 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -12,12 +12,7 @@ import com.eaio.uuid.UUID import akka.actor._ import scala.collection.JavaConverters._ -object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider { - def lookup() = this - def createExtension(system: ActorSystemImpl) = new RemoteExtensionSettings(system.settings.config, system.name) -} - -class RemoteExtensionSettings(val config: Config, val systemName: String) extends Extension { +class RemoteSettings(val config: Config, val systemName: String) extends Extension { import config._ diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8ceb56b16b..a8ff2ae024 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -60,7 +60,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) + send(remoteSupport.createRemoteMessageProtocolBuilder(recipient, message, senderOption).build) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) @@ -150,6 +150,7 @@ class ActiveRemoteClient private[akka] ( val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setSystem(senderRemoteAddress.system) .setHostname(senderRemoteAddress.host) .setPort(senderRemoteAddress.port) .build) @@ -352,8 +353,8 @@ class ActiveRemoteClientHandler( class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends RemoteSupport(_system) with RemoteMarshallingOps { val log = Logging(system, "NettyRemoteSupport") - val serverSettings = RemoteExtension(system).serverSettings - val clientSettings = RemoteExtension(system).clientSettings + val serverSettings = remote.remoteSettings.serverSettings + val clientSettings = remote.remoteSettings.clientSettings val timer: HashedWheelTimer = new HashedWheelTimer @@ -464,11 +465,13 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot def isRunning = _isRunning.isOn - def start(loader: Option[ClassLoader] = None): Unit = _isRunning switchOn { - try { - currentServer.set(Some(new NettyRemoteServer(this, loader))) - } catch { - case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) + def start(loader: Option[ClassLoader] = None): Unit = { + _isRunning switchOn { + try { + currentServer.set(Some(new NettyRemoteServer(this, loader))) + } catch { + case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) + } } } @@ -519,6 +522,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setSystem(address.system) .setHostname(address.host) .setPort(address.port) .build) @@ -648,8 +652,7 @@ class RemoteServerHandler( instruction.getCommandType match { case CommandType.CONNECT if UsePassiveConnections ⇒ val origin = instruction.getOrigin - // FIXME RK need to include system-name in remote protocol - val inbound = RemoteAddress("BORKED", origin.getHostname, origin.getPort) + val inbound = RemoteAddress(origin.getSystem, origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //FIXME Dispose passive connection here, ticket #1410 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala index e22cfd7195..d0e251f86f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -23,10 +23,6 @@ class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { "___" must { "___" in { - barrier("setup") - - remote.start() - barrier("start") barrier("done") } @@ -41,10 +37,6 @@ class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTi "A new remote actor configured with a Direct router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("setup") - - remote.start() - barrier("start") val actor = system.actorOf[SomeActor]("service-hello") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala index e42594c949..d4d1745bea 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala @@ -22,10 +22,6 @@ class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { "___" must { "___" in { - barrier("setup") - - remote.start() - barrier("start") barrier("done") @@ -41,10 +37,6 @@ class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTimeout { "A new remote actor" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("setup") - - remote.start() - barrier("start") val actor = system.actorOf[SomeActor]("service-hello") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index d985c770c2..41348cad0b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -21,8 +21,6 @@ class RandomRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -36,8 +34,6 @@ class RandomRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -51,8 +47,6 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -67,9 +61,6 @@ class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTi "A new remote actor configured with a Random router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("setup") - remote.start() - barrier("start") val actor = system.actorOf[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 08c0009f4b..02406a3db1 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -21,8 +21,6 @@ class RoundRobinRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -36,8 +34,6 @@ class RoundRobinRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -51,8 +47,6 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -67,9 +61,6 @@ class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with Defau "A new remote actor configured with a RoundRobin router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("setup") - remote.start() - barrier("start") val actor = system.actorOf[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 764cda813e..f35dd311a9 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -21,8 +21,6 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -36,8 +34,6 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -51,8 +47,6 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { val nodes = NrOfNodes "___" must { "___" in { - barrier("setup") - remote.start() barrier("start") barrier("broadcast-end") barrier("end") @@ -67,9 +61,6 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with De "A new remote actor configured with a ScatterGather router" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { - barrier("setup") - remote.start() - barrier("start") val actor = system.actorOf[SomeActor]("service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala new file mode 100644 index 0000000000..72ed415e10 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote + +import akka.testkit._ +import akka.actor._ +import com.typesafe.config._ + +class RemoteCommunicationSpec extends AkkaSpec(""" +akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + cluster.nodename = Nonsense + loglevel = DEBUG + remote.server { + hostname = localhost + port = 12345 + } +} +""") with ImplicitSender { + + val conf = ConfigFactory.parseString("akka.remote.server.port=12346").withFallback(system.settings.config) + val other = ActorSystem("remote_sys", conf) + + system.eventStream.subscribe(system.actorFor("/system/log1-TestEventListener"), classOf[RemoteLifeCycleEvent]) + other.eventStream.subscribe(other.actorFor("/system/log1-TestEventListener"), classOf[RemoteLifeCycleEvent]) + + val remote = other.actorOf(Props(new Actor { + def receive = { + case "ping" ⇒ sender ! (("pong", sender)) + } + }), "echo") + + val here = system.actorFor("akka://remote_sys@localhost:12346/user/echo") + + implicit val timeout = system.settings.ActorTimeout + + override def atTermination() { + other.stop() + } + + "Remoting" must { + + "support remote look-ups" in { + here ! "ping" + expectMsgPF() { + case ("pong", s: AnyRef) if s eq testActor ⇒ true + } + } + + "send error message for wrong address" in { + EventFilter.error(start = "dropping", occurrences = 1).intercept { + system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" + }(other) + } + + "support ask" in { + (here ? "ping").get match { + case ("pong", s: AskActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") + } + } + + "send dead letters on remote if actor does not exist" in { + EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { + system.actorFor("akka://remote_sys@localhost:12346/does/not/exist") ! "buh" + }(other) + } + + } + +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 87a213e0eb..36c8ed8f30 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -8,7 +8,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { "RemoteExtension" must { "be able to parse remote and cluster config elements" in { - val config = RemoteExtension(system).config + val config = system.settings.config import config._ //akka.remote diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index afa174ead6..7da8d84eba 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -464,6 +464,7 @@ class TestEventListener extends Logging.DefaultLogger { val event = Warning(rcp.path.toString, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } + case m ⇒ print(Debug(context.system.name, m)) } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false })