diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 45aa0514f6..722f4e428e 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -121,15 +121,15 @@ object Publish { def forConsumer(actor: ActorRef): Option[Publish] = forConsumeAnnotated(actor) orElse forConsumerType(actor) - private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = { - val annotation = actorId.actorClass.getAnnotation(classOf[consume]) + private def forConsumeAnnotated(actorRef: ActorRef): Option[Publish] = { + val annotation = actorRef.actorClass.getAnnotation(classOf[consume]) if (annotation eq null) None - else if (actorId.remoteAddress.isDefined) None // do not publish proxies - else Some(Publish(annotation.value, actorId.id, false)) + else if (actorRef.remoteAddress.isDefined) None // do not publish proxies + else Some(Publish(annotation.value, actorRef.id, false)) } - private def forConsumerType(actorId: ActorRef): Option[Publish] = - if (!actorId.actor.isInstanceOf[Consumer]) None - else if (actorId.remoteAddress.isDefined) None - else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) + private def forConsumerType(actorRef: ActorRef): Option[Publish] = + if (!actorRef.actor.isInstanceOf[Consumer]) None + else if (actorRef.remoteAddress.isDefined) None + else Some(Publish(actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true)) } diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java index f8ee893393..69a50a4bd2 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java @@ -7,28 +7,465 @@ public final class RemoteProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } - public static final class RemoteRequest extends + public static final class ActorRefProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteRequest.newBuilder() to construct. - private RemoteRequest() {} + // Use ActorRefProtocol.newBuilder() to construct. + private ActorRefProtocol() {} - private static final RemoteRequest defaultInstance = new RemoteRequest(); - public static RemoteRequest getDefaultInstance() { + private static final ActorRefProtocol defaultInstance = new ActorRefProtocol(); + public static ActorRefProtocol getDefaultInstance() { return defaultInstance; } - public RemoteRequest getDefaultInstanceForType() { + public ActorRefProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; + } + + // required string uuid = 1; + public static final int UUID_FIELD_NUMBER = 1; + private boolean hasUuid; + private java.lang.String uuid_ = ""; + public boolean hasUuid() { return hasUuid; } + public java.lang.String getUuid() { return uuid_; } + + // required string actorClassName = 2; + public static final int ACTORCLASSNAME_FIELD_NUMBER = 2; + private boolean hasActorClassName; + private java.lang.String actorClassName_ = ""; + public boolean hasActorClassName() { return hasActorClassName; } + public java.lang.String getActorClassName() { return actorClassName_; } + + // required string sourceHostname = 3; + public static final int SOURCEHOSTNAME_FIELD_NUMBER = 3; + private boolean hasSourceHostname; + private java.lang.String sourceHostname_ = ""; + public boolean hasSourceHostname() { return hasSourceHostname; } + public java.lang.String getSourceHostname() { return sourceHostname_; } + + // required uint32 sourcePort = 4; + public static final int SOURCEPORT_FIELD_NUMBER = 4; + private boolean hasSourcePort; + private int sourcePort_ = 0; + public boolean hasSourcePort() { return hasSourcePort; } + public int getSourcePort() { return sourcePort_; } + + // required uint64 timeout = 5; + public static final int TIMEOUT_FIELD_NUMBER = 5; + private boolean hasTimeout; + private long timeout_ = 0L; + public boolean hasTimeout() { return hasTimeout; } + public long getTimeout() { return timeout_; } + + public final boolean isInitialized() { + if (!hasUuid) return false; + if (!hasActorClassName) return false; + if (!hasSourceHostname) return false; + if (!hasSourcePort) return false; + if (!hasTimeout) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasUuid()) { + output.writeString(1, getUuid()); + } + if (hasActorClassName()) { + output.writeString(2, getActorClassName()); + } + if (hasSourceHostname()) { + output.writeString(3, getSourceHostname()); + } + if (hasSourcePort()) { + output.writeUInt32(4, getSourcePort()); + } + if (hasTimeout()) { + output.writeUInt64(5, getTimeout()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasUuid()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(1, getUuid()); + } + if (hasActorClassName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getActorClassName()); + } + if (hasSourceHostname()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(3, getSourceHostname()); + } + if (hasSourcePort()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(4, getSourcePort()); + } + if (hasTimeout()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(5, getTimeout()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol result; + + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) return this; + if (other.hasUuid()) { + setUuid(other.getUuid()); + } + if (other.hasActorClassName()) { + setActorClassName(other.getActorClassName()); + } + if (other.hasSourceHostname()) { + setSourceHostname(other.getSourceHostname()); + } + if (other.hasSourcePort()) { + setSourcePort(other.getSourcePort()); + } + if (other.hasTimeout()) { + setTimeout(other.getTimeout()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + setUuid(input.readString()); + break; + } + case 18: { + setActorClassName(input.readString()); + break; + } + case 26: { + setSourceHostname(input.readString()); + break; + } + case 32: { + setSourcePort(input.readUInt32()); + break; + } + case 40: { + setTimeout(input.readUInt64()); + break; + } + } + } + } + + + // required string uuid = 1; + public boolean hasUuid() { + return result.hasUuid(); + } + public java.lang.String getUuid() { + return result.getUuid(); + } + public Builder setUuid(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasUuid = true; + result.uuid_ = value; + return this; + } + public Builder clearUuid() { + result.hasUuid = false; + result.uuid_ = getDefaultInstance().getUuid(); + return this; + } + + // required string actorClassName = 2; + public boolean hasActorClassName() { + return result.hasActorClassName(); + } + public java.lang.String getActorClassName() { + return result.getActorClassName(); + } + public Builder setActorClassName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorClassName = true; + result.actorClassName_ = value; + return this; + } + public Builder clearActorClassName() { + result.hasActorClassName = false; + result.actorClassName_ = getDefaultInstance().getActorClassName(); + return this; + } + + // required string sourceHostname = 3; + public boolean hasSourceHostname() { + return result.hasSourceHostname(); + } + public java.lang.String getSourceHostname() { + return result.getSourceHostname(); + } + public Builder setSourceHostname(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSourceHostname = true; + result.sourceHostname_ = value; + return this; + } + public Builder clearSourceHostname() { + result.hasSourceHostname = false; + result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + return this; + } + + // required uint32 sourcePort = 4; + public boolean hasSourcePort() { + return result.hasSourcePort(); + } + public int getSourcePort() { + return result.getSourcePort(); + } + public Builder setSourcePort(int value) { + result.hasSourcePort = true; + result.sourcePort_ = value; + return this; + } + public Builder clearSourcePort() { + result.hasSourcePort = false; + result.sourcePort_ = 0; + return this; + } + + // required uint64 timeout = 5; + public boolean hasTimeout() { + return result.hasTimeout(); + } + public long getTimeout() { + return result.getTimeout(); + } + public Builder setTimeout(long value) { + result.hasTimeout = true; + result.timeout_ = value; + return this; + } + public Builder clearTimeout() { + result.hasTimeout = false; + result.timeout_ = 0L; + return this; + } + } + + static { + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.getDescriptor(); + } + + static { + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internalForceInit(); + } + } + + public static final class RemoteRequestProtocol extends + com.google.protobuf.GeneratedMessage { + // Use RemoteRequestProtocol.newBuilder() to construct. + private RemoteRequestProtocol() {} + + private static final RemoteRequestProtocol defaultInstance = new RemoteRequestProtocol(); + public static RemoteRequestProtocol getDefaultInstance() { + return defaultInstance; + } + + public RemoteRequestProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -115,33 +552,12 @@ public final class RemoteProtocol { public boolean hasIsEscaped() { return hasIsEscaped; } public boolean getIsEscaped() { return isEscaped_; } - // optional string sourceHostname = 13; - public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13; - private boolean hasSourceHostname; - private java.lang.String sourceHostname_ = ""; - public boolean hasSourceHostname() { return hasSourceHostname; } - public java.lang.String getSourceHostname() { return sourceHostname_; } - - // optional uint32 sourcePort = 14; - public static final int SOURCEPORT_FIELD_NUMBER = 14; - private boolean hasSourcePort; - private int sourcePort_ = 0; - public boolean hasSourcePort() { return hasSourcePort; } - public int getSourcePort() { return sourcePort_; } - - // optional string sourceTarget = 15; - public static final int SOURCETARGET_FIELD_NUMBER = 15; - private boolean hasSourceTarget; - private java.lang.String sourceTarget_ = ""; - public boolean hasSourceTarget() { return hasSourceTarget; } - public java.lang.String getSourceTarget() { return sourceTarget_; } - - // optional string sourceUuid = 16; - public static final int SOURCEUUID_FIELD_NUMBER = 16; - private boolean hasSourceUuid; - private java.lang.String sourceUuid_ = ""; - public boolean hasSourceUuid() { return hasSourceUuid; } - public java.lang.String getSourceUuid() { return sourceUuid_; } + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public static final int SENDER_FIELD_NUMBER = 13; + private boolean hasSender; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); + public boolean hasSender() { return hasSender; } + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { return sender_; } public final boolean isInitialized() { if (!hasId) return false; @@ -153,6 +569,9 @@ public final class RemoteProtocol { if (!hasIsActor) return false; if (!hasIsOneWay) return false; if (!hasIsEscaped) return false; + if (hasSender()) { + if (!getSender().isInitialized()) return false; + } return true; } @@ -194,17 +613,8 @@ public final class RemoteProtocol { if (hasIsEscaped()) { output.writeBool(12, getIsEscaped()); } - if (hasSourceHostname()) { - output.writeString(13, getSourceHostname()); - } - if (hasSourcePort()) { - output.writeUInt32(14, getSourcePort()); - } - if (hasSourceTarget()) { - output.writeString(15, getSourceTarget()); - } - if (hasSourceUuid()) { - output.writeString(16, getSourceUuid()); + if (hasSender()) { + output.writeMessage(13, getSender()); } getUnknownFields().writeTo(output); } @@ -263,78 +673,66 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeBoolSize(12, getIsEscaped()); } - if (hasSourceHostname()) { + if (hasSender()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(13, getSourceHostname()); - } - if (hasSourcePort()) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(14, getSourcePort()); - } - if (hasSourceTarget()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(15, getSourceTarget()); - } - if (hasSourceUuid()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(16, getSourceUuid()); + .computeMessageSize(13, getSender()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -344,25 +742,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol internalGetResult() { return result; } @@ -371,7 +769,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return this; } @@ -381,24 +779,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -407,27 +805,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -464,17 +862,8 @@ public final class RemoteProtocol { if (other.hasIsEscaped()) { setIsEscaped(other.getIsEscaped()); } - if (other.hasSourceHostname()) { - setSourceHostname(other.getSourceHostname()); - } - if (other.hasSourcePort()) { - setSourcePort(other.getSourcePort()); - } - if (other.hasSourceTarget()) { - setSourceTarget(other.getSourceTarget()); - } - if (other.hasSourceUuid()) { - setSourceUuid(other.getSourceUuid()); + if (other.hasSender()) { + mergeSender(other.getSender()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -550,19 +939,12 @@ public final class RemoteProtocol { break; } case 106: { - setSourceHostname(input.readString()); - break; - } - case 112: { - setSourcePort(input.readUInt32()); - break; - } - case 122: { - setSourceTarget(input.readString()); - break; - } - case 130: { - setSourceUuid(input.readString()); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(); + if (hasSender()) { + subBuilder.mergeFrom(getSender()); + } + input.readMessage(subBuilder, extensionRegistry); + setSender(subBuilder.buildPartial()); break; } } @@ -804,84 +1186,40 @@ public final class RemoteProtocol { return this; } - // optional string sourceHostname = 13; - public boolean hasSourceHostname() { - return result.hasSourceHostname(); + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public boolean hasSender() { + return result.hasSender(); } - public java.lang.String getSourceHostname() { - return result.getSourceHostname(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { + return result.getSender(); } - public Builder setSourceHostname(java.lang.String value) { + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { if (value == null) { - throw new NullPointerException(); - } - result.hasSourceHostname = true; - result.sourceHostname_ = value; + throw new NullPointerException(); + } + result.hasSender = true; + result.sender_ = value; return this; } - public Builder clearSourceHostname() { - result.hasSourceHostname = false; - result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder builderForValue) { + result.hasSender = true; + result.sender_ = builderForValue.build(); return this; } - - // optional uint32 sourcePort = 14; - public boolean hasSourcePort() { - return result.hasSourcePort(); - } - public int getSourcePort() { - return result.getSourcePort(); - } - public Builder setSourcePort(int value) { - result.hasSourcePort = true; - result.sourcePort_ = value; + public Builder mergeSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { + if (result.hasSender() && + result.sender_ != se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { + result.sender_ = + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(result.sender_).mergeFrom(value).buildPartial(); + } else { + result.sender_ = value; + } + result.hasSender = true; return this; } - public Builder clearSourcePort() { - result.hasSourcePort = false; - result.sourcePort_ = 0; - return this; - } - - // optional string sourceTarget = 15; - public boolean hasSourceTarget() { - return result.hasSourceTarget(); - } - public java.lang.String getSourceTarget() { - return result.getSourceTarget(); - } - public Builder setSourceTarget(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceTarget = true; - result.sourceTarget_ = value; - return this; - } - public Builder clearSourceTarget() { - result.hasSourceTarget = false; - result.sourceTarget_ = getDefaultInstance().getSourceTarget(); - return this; - } - - // optional string sourceUuid = 16; - public boolean hasSourceUuid() { - return result.hasSourceUuid(); - } - public java.lang.String getSourceUuid() { - return result.getSourceUuid(); - } - public Builder setSourceUuid(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceUuid = true; - result.sourceUuid_ = value; - return this; - } - public Builder clearSourceUuid() { - result.hasSourceUuid = false; - result.sourceUuid_ = getDefaultInstance().getSourceUuid(); + public Builder clearSender() { + result.hasSender = false; + result.sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); return this; } } @@ -895,28 +1233,28 @@ public final class RemoteProtocol { } } - public static final class RemoteReply extends + public static final class RemoteReplyProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteReply.newBuilder() to construct. - private RemoteReply() {} + // Use RemoteReplyProtocol.newBuilder() to construct. + private RemoteReplyProtocol() {} - private static final RemoteReply defaultInstance = new RemoteReply(); - public static RemoteReply getDefaultInstance() { + private static final RemoteReplyProtocol defaultInstance = new RemoteReplyProtocol(); + public static RemoteReplyProtocol getDefaultInstance() { return defaultInstance; } - public RemoteReply getDefaultInstanceForType() { + public RemoteReplyProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -1054,57 +1392,57 @@ public final class RemoteProtocol { return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -1114,25 +1452,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol internalGetResult() { return result; } @@ -1141,7 +1479,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return this; } @@ -1151,24 +1489,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -1177,27 +1515,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -1451,15 +1789,20 @@ public final class RemoteProtocol { } private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1471,41 +1814,52 @@ public final class RemoteProtocol { java.lang.String[] descriptorData = { "\n>se/scalablesolutions/akka/remote/proto" + "buf/RemoteProtocol.proto\022)se.scalablesol" + - "utions.akka.remote.protobuf\"\272\002\n\rRemoteRe" + - "quest\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007m" + - "essage\030\003 \002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n" + - "\006method\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 " + - "\002(\t\022\017\n\007timeout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t" + - " \001(\t\022\017\n\007isActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022" + - "\021\n\tisEscaped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001" + - "(\t\022\022\n\nsourcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017", - " \001(\t\022\022\n\nsourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply" + - "\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007messag" + - "e\030\003 \001(\014\022\027\n\017messageManifest\030\004 \001(\014\022\021\n\texce" + - "ption\030\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007i" + - "sActor\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" + "utions.akka.remote.protobuf\"u\n\020ActorRefP" + + "rotocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030" + + "\002 \002(\t\022\026\n\016sourceHostname\030\003 \002(\t\022\022\n\nsourceP" + + "ort\030\004 \002(\r\022\017\n\007timeout\030\005 \002(\004\"\271\002\n\025RemoteReq" + + "uestProtocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002" + + "(\r\022\017\n\007message\030\003 \002(\014\022\027\n\017messageManifest\030\004" + + " \001(\014\022\016\n\006method\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004" + + "uuid\030\007 \002(\t\022\017\n\007timeout\030\010 \002(\004\022\026\n\016superviso", + "rUuid\030\t \001(\t\022\017\n\007isActor\030\n \002(\010\022\020\n\010isOneWay" + + "\030\013 \002(\010\022\021\n\tisEscaped\030\014 \002(\010\022K\n\006sender\030\r \001(" + + "\0132;.se.scalablesolutions.akka.remote.pro" + + "tobuf.ActorRefProtocol\"\257\001\n\023RemoteReplyPr" + + "otocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007" + + "message\030\003 \001(\014\022\027\n\017messageManifest\030\004 \001(\014\022\021" + + "\n\texception\030\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(" + + "\t\022\017\n\007isActor\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor = getDescriptor().getMessageTypes().get(0); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor, - new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.Builder.class); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor, + new java.lang.String[] { "Uuid", "ActorClassName", "SourceHostname", "SourcePort", "Timeout", }, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor = getDescriptor().getMessageTypes().get(1); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor, + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor, + new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "Sender", }, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor = + getDescriptor().getMessageTypes().get(2); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor, new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.Builder.class); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.Builder.class); return null; } }; diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto index 79885bd043..372691cd8b 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto @@ -10,7 +10,15 @@ package se.scalablesolutions.akka.remote.protobuf; protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . */ -message RemoteRequest { +message ActorRefProtocol { + required string uuid = 1; + required string actorClassName = 2; + required string sourceHostname = 3; + required uint32 sourcePort = 4; + required uint64 timeout = 5; +} + +message RemoteRequestProtocol { required uint64 id = 1; required uint32 protocol = 2; required bytes message = 3; @@ -23,13 +31,10 @@ message RemoteRequest { required bool isActor = 10; required bool isOneWay = 11; required bool isEscaped = 12; - optional string sourceHostname = 13; - optional uint32 sourcePort = 14; - optional string sourceTarget = 15; - optional string sourceUuid = 16; + optional ActorRefProtocol sender = 13; } -message RemoteReply { +message RemoteReplyProtocol { required uint64 id = 1; optional uint32 protocol = 2; optional bytes message = 3; diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 187d770a07..7a81953403 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -5,8 +5,8 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.config.FaultHandlingStrategy -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol +import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer @@ -284,9 +284,9 @@ object ActiveObject { actor.initialize(target, proxy) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -295,9 +295,9 @@ object ActiveObject { actor.initialize(target.getClass, target) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry { private[akka] sealed case class AspectInit( val target: Class[_], - val actorId: ActorRef, + val actorRef: ActorRef, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { - def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout) + def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout) } /** @@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit( private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ - private var actorId: ActorRef = _ + private var actorRef: ActorRef = _ private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ @@ -414,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actorId = init.actorId + actorRef = init.actorRef remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -430,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect { private def localDispatch(joinPoint: JoinPoint): AnyRef = { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] if (isOneWay(rtti)) { - (actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] + (actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) + val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -443,17 +443,17 @@ private[akka] sealed class ActiveObjectAspect { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val oneWay_? = isOneWay(rtti) || isVoid(rtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues) - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setMethod(rtti.getMethod.getName) .setTarget(target.getName) - .setUuid(actorId.uuid) + .setUuid(actorRef.uuid) .setTimeout(timeout) .setIsActor(false) .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actorId.actor.registerSupervisorAsRemoteActor + val id = actorRef.actor.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 381df9d535..76fcfa0f15 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,10 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -228,24 +228,54 @@ object Actor extends Logging { } } - /** Starts the specified actor and returns it, useful for: - *
val actor = new FooActor
+  /** 
+   * Starts the specified actor and returns it, useful for simplifying code such as:
+   * 
+   *  val actor = new FooActor
    *  actor.start
-   *  //Gets replaced by
+   * 
+ * can be replaced with: + *
+   *  import Actor._
+   *
    *  val actor = start(new FooActor)
-   *  
+ *
*/ def start[T <: Actor](actor : T) : T = { actor.start actor } - +} + +/** + * The ActorRef object can be used to create ActorRef instances out of its binary + * protobuf based representation. + *
+ *   val actorRef = ActorRef.fromBinary(bytes)
+ *   actorRef ! message // send message to remote actor through its reference
+ * 
+ * + * @author Jonas Bonér + */ +object ActorRef { + def fromBinary(bytes: Array[Byte]): ActorRef = + fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build) + + def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef = + RemoteActorProxy( + protocol.getUuid, + protocol.getActorClassName, + protocol.getSourceHostname, + protocol.getSourcePort, + protocol.getTimeout) } /** * ActorRef is an immutable and serializable handle to an Actor. + *

* Create an ActorRef for an Actor by using the factory method on the Actor object. - * Here is an example: + *

+ * Here is an example on how to create an actor with a default constructor. *

  *   import Actor._
  * 
@@ -254,24 +284,23 @@ object Actor extends Logging {
  *   actor ! message
  *   actor.stop
  * 
+ * Here is an example on how to create an actor with a non-default constructor. + *
+ *   import Actor._
+ * 
+ *   val actor = newActor(() => new MyActor(...))
+ *   actor.start
+ *   actor ! message
+ *   actor.stop
+ * 
* * @author Jonas Bonér */ final class ActorRef private[akka] () { - private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) + private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) - private[akka] def this(clazz: Class[_ <: Actor]) = { - this() - newActorFactory = Left(Some(clazz)) - } - - private[akka] def this(factory: () => Actor) = { - this() - newActorFactory = Right(Some(factory)) - } - private[akka] lazy val actor: Actor = { - val actor = newActorFactory match { + val actor = actorFactory match { case Left(Some(clazz)) => try { clazz.newInstance @@ -290,6 +319,39 @@ final class ActorRef private[akka] () { actor } + private[akka] def this(clazz: Class[_ <: Actor]) = { + this() + actorFactory = Left(Some(clazz)) + } + + private[akka] def this(factory: () => Actor) = { + this() + actorFactory = Right(Some(factory)) + } + + def toProtocol: RemoteProtocol.ActorRefProtocol = { + val (host, port) = actor._replyToAddress.map(address => + (address.getHostName, address.getPort)) + .getOrElse((Actor.HOSTNAME, Actor.PORT)) + + if (!actor._registeredInRemoteNodeDuringSerialization) { + Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) + if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) + RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) + actor._registeredInRemoteNodeDuringSerialization = true + } + + RemoteProtocol.ActorRefProtocol.newBuilder + .setUuid(uuid) + .setActorClassName(actorClass.getName) + .setSourceHostname(host) + .setSourcePort(port) + .setTimeout(timeout) + .build + } + + def toBinary: Array[Byte] = toProtocol.toByteArray + /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -397,7 +459,7 @@ final class ActorRef private[akka] () { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: Any): Option[T] = !![T](message, actor.timeout) + def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, actor.timeout) /** * Sends a message asynchronously returns a future holding the eventual reply message. @@ -425,7 +487,7 @@ final class ActorRef private[akka] () { if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (actor.isRunning) { sender.get.actor.replyTo match { - case Some(Left(actorID)) => actor.postMessageToMailbox(message, Some(actorID)) + case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef)) case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future)) case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") } @@ -546,6 +608,7 @@ trait Actor extends TransactionManagement with Logging { @volatile private[this] var _isSuspended = true @volatile private[this] var _isShutDown = false @volatile private[akka] var _isKilled = false + @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None @@ -755,6 +818,7 @@ trait Actor extends TransactionManagement with Logging { shutdown ActorRegistry.unregister(self) _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + RemoteNode.unregister(self) } } @@ -849,7 +913,8 @@ trait Actor extends TransactionManagement with Logging { /** - * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. + * Set the contact address for this actor. This is used for replying to messages sent + * asynchronously when no reply channel exists. */ def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) @@ -879,12 +944,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def link(actorId: ActorRef) = { - if (actorId.supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") - getLinkedActors.add(actorId) - actorId.supervisor = Some(self) - Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this) + protected[this] def link(actorRef: ActorRef) = { + if (actorRef.supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + getLinkedActors.add(actorRef) + actorRef.supervisor = Some(self) + Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) } /** @@ -892,12 +957,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def unlink(actorId: ActorRef) = { - if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( - "Actor [" + actorId + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actorId) - actorId.supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorId, this) + protected[this] def unlink(actorRef: ActorRef) = { + if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException( + "Actor [" + actorRef + "] is not a linked actor, can't unlink") + getLinkedActors.remove(actorRef) + actorRef.supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) } /** @@ -905,11 +970,11 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLink(actorId: ActorRef) = { + protected[this] def startLink(actorRef: ActorRef) = { try { - actorId.start + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -918,12 +983,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = { + protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = { try { - actorId.makeRemote(hostname, port) - actorId.start + actorRef.makeRemote(hostname, port) + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -933,9 +998,9 @@ trait Actor extends TransactionManagement with Logging { * To be invoked from within the actor itself. */ protected[this] def spawn[T <: Actor : Manifest]: ActorRef = { - val actorId = spawnButDoNotStart[T] - actorId.start - actorId + val actorRef = spawnButDoNotStart[T] + actorRef.start + actorRef } /** @@ -1007,15 +1072,15 @@ trait Actor extends TransactionManagement with Logging { new ActorRef(() => actor) } - protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) @@ -1023,30 +1088,12 @@ trait Actor extends TransactionManagement with Logging { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - // set the source fields used to reply back to the original sender - // (i.e. not the remote proxy actor) - if (sender.isDefined) { - val s = sender.get.actor - requestBuilder.setSourceTarget(s.getClass.getName) - requestBuilder.setSourceUuid(s.uuid) + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - - Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) - - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - - if (RemoteServer.serverFor(host, port).isEmpty) { - val server = new RemoteServer - server.start(host, port) - } - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get) - } RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -1062,28 +1109,30 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) - - if (messageDispatcher.usesActorMailbox) - _mailbox.add(invocation) - + val invocation = new MessageInvocation( + self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) invocation.send future } @@ -1217,8 +1266,8 @@ trait Actor extends TransactionManagement with Logging { private[this] def restartLinkedActors(reason: Throwable) = { getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { - actorId => - val actor = actorId.actor + actorRef => + val actor = actorRef.actor if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) actor.lifeCycle.get match { case LifeCycle(scope, _) => { @@ -1228,7 +1277,7 @@ trait Actor extends TransactionManagement with Logging { case Temporary => Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) actor.stop - getLinkedActors.remove(actorId) // remove the temporary actor + getLinkedActors.remove(actorRef) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (getLinkedActors.isEmpty) { Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + @@ -1314,7 +1363,7 @@ object DispatcherType { * * @author Jonas Bonér */ -class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) +class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker { + def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle) } diff --git a/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto b/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index c72c588937..c52c59b2ae 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -57,9 +57,9 @@ object ActorRegistry extends Logging { val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { - val actorId = elements.nextElement - if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { - all += actorId + val actorRef = elements.nextElement + if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) { + all += actorRef } } all.toList @@ -92,24 +92,24 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorId: ActorRef) = { + def register(actorRef: ActorRef) = { // UUID - actorsByUUID.put(actorId.uuid, actorId) + actorsByUUID.put(actorRef.uuid, actorRef) // ID - val id = actorId.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) - if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) - else actorsById.put(id, actorId :: Nil) + val id = actorRef.id + if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef) + if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id)) + else actorsById.put(id, actorRef :: Nil) // Class name - val className = actorId.actor.getClass.getName + val className = actorRef.actor.getClass.getName if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actorId :: Nil) + actorsByClassName.put(className, actorRef :: actorsByClassName.get(className)) + } else actorsByClassName.put(className, actorRef :: Nil) // notify listeners - foreachListener(_ ! ActorRegistered(actorId)) + foreachListener(_ ! ActorRegistered(actorRef)) } /** diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 905cd6c8fe..2b31bea60c 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -58,9 +58,9 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - def stopSupervising(actorId: ActorRef) = { - unlink(actorId) - schedulers.remove(actorId) + def stopSupervising(actorRef: ActorRef) = { + unlink(actorRef) + schedulers.remove(actorRef) } override def shutdown = { diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index f4b7b2b012..c0023ae44b 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -103,9 +103,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep override def stop = synchronized { super[Actor].stop - getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId => - actorId.stop - log.info("Shutting actor down: %s", actorId) + getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + actorRef.stop + log.info("Shutting actor down: %s", actorRef) } log.info("Stopping supervisor: %s", this) } @@ -119,19 +119,19 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actorId, lifeCycle, remoteAddress) => - val className = actorId.actor.getClass.getName + case Supervise(actorRef, lifeCycle, remoteAddress) => + val className = actorRef.actor.getClass.getName val currentActors = { val list = actors.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, actorId :: currentActors) - actorId.actor.lifeCycle = Some(lifeCycle) - startLink(actorId) + actors.put(className, actorRef :: currentActors) + actorRef.actor.lifeCycle = Some(lifeCycle) + startLink(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorId.id, actorId)) + .actors.put(actorRef.id, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = { diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index 7e1daa5935..7f1cd308b3 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -25,13 +25,13 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { - def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) - def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) - def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } case class RestartStrategy( @@ -227,8 +227,8 @@ object JavaConfig { intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - def newSupervised(actorId: ActorRef) = - se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) + def newSupervised(actorRef: ActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index f5fd490df9..8d87272ef0 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -18,14 +18,14 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actorId: ActorRef) = synchronized { - messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) - super.register(actorId) + override def register(actorRef: ActorRef) = synchronized { + messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef)) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = synchronized { - messageInvokers.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = synchronized { + messageInvokers.remove(actorRef) + super.unregister(actorRef) } def shutdown = if (active) { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 23adfed05f..4edf6651c0 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -31,7 +31,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false - implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor + implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor /** Type of the actors registered in this dispatcher. */ private var actorType:Option[Class[_]] = None @@ -193,15 +193,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - override def register(actorId: ActorRef) = { - verifyActorsAreOfSameType(actorId) - pooledActors.add(actorId) - super.register(actorId) + override def register(actorRef: ActorRef) = { + verifyActorsAreOfSameType(actorRef) + pooledActors.add(actorRef) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = { - pooledActors.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = { + pooledActors.remove(actorRef) + super.unregister(actorRef) } def usesActorMailbox = true diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 9d5c049495..6c9fc0f842 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -60,9 +60,9 @@ trait MessageDispatcher extends Logging { def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actorId: ActorRef) = references.put(actorId.uuid, actorId) - def unregister(actorId: ActorRef) = { - references.remove(actorId.uuid) + def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef) + def unregister(actorRef: ActorRef) = { + references.remove(actorRef.uuid) if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index f977260cf2..38e068b9a1 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.remote -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} @@ -31,7 +31,7 @@ import scala.collection.mutable.{HashSet, HashMap} * * @author Jonas Bonér */ -object RemoteRequestIdFactory { +object RemoteRequestProtocolIdFactory { private val nodeId = UUID.newUuid private val id = new AtomicLong @@ -64,23 +64,15 @@ private[akka] class RemoteActorProxy private ( val remoteClient = RemoteClient.clientFor(hostname, port) override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeOut) .setUuid(uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) - if (senderOption.isDefined) { - val sender = senderOption.get.actor - requestBuilder.setSourceTarget(sender.getClass.getName) - requestBuilder.setSourceUuid(sender.uuid) - val (host, port) = sender._replyToAddress.map(address => - (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) remoteClient.send[Any](requestBuilder.build, None) } @@ -89,14 +81,15 @@ private[akka] class RemoteActorProxy private ( message: Any, timeout: Long, senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeout) .setUuid(uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) val future = remoteClient.send(requestBuilder.build, senderFuture) if (future.isDefined) future.get @@ -121,14 +114,14 @@ object RemoteClient extends Logging { def actorFor(className: String, hostname: String, port: Int): ActorRef = actorFor(className, className, 5000L, hostname, port) - def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(actorId, className, 5000L, hostname, port) + def actorFor(actorRef: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(actorRef, className, 5000L, hostname, port) def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorProxy(actorId, className, hostname, port, timeout) + def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorProxy(actorRef, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) @@ -237,7 +230,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } - def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { + def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) None @@ -256,17 +249,17 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { throw exception } - def registerSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") - else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) + def registerSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision") + else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - def deregisterSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") - else supervisors.remove(actorId.supervisor.get.uuid) + def deregisterSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision") + else supervisors.remove(actorRef.supervisor.get.uuid) - def registerListener(actorId: ActorRef) = listeners.add(actorId) + def registerListener(actorRef: ActorRef) = listeners.add(actorRef) - def deregisterListener(actorId: ActorRef) = listeners.remove(actorId) + def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef) } /** @@ -283,7 +276,7 @@ class RemoteClientPipelineFactory(name: String, val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -323,9 +316,9 @@ class RemoteClientHandler(val name: String, override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { val result = event.getMessage - if (result.isInstanceOf[RemoteReply]) { - val reply = result.asInstanceOf[RemoteReply] - log.debug("Remote client received RemoteReply[\n%s]", reply.toString) + if (result.isInstanceOf[RemoteReplyProtocol]) { + val reply = result.asInstanceOf[RemoteReplyProtocol] + log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) @@ -388,7 +381,7 @@ class RemoteClientHandler(val name: String, event.getChannel.close } - private def parseException(reply: RemoteReply) = { + private def parseException(reply: RemoteReplyProtocol) = { val exception = reply.getException val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$'))) val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length) diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index b95ac210f5..b9c446d3d3 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.serialization.Serializable.SBinary import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol} -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import com.google.protobuf.{Message, ByteString} @@ -23,7 +23,7 @@ object RemoteProtocolBuilder { SERIALIZER_SCALA_JSON.classLoader = Some(cl) } - def getMessage(request: RemoteRequest): Any = { + def getMessage(request: RemoteRequestProtocol): Any = { request.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) @@ -42,7 +42,7 @@ object RemoteProtocolBuilder { } } - def getMessage(reply: RemoteReply): Any = { + def getMessage(reply: RemoteReplyProtocol): Any = { reply.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) @@ -61,7 +61,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteRequest.Builder) = { + def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) @@ -89,7 +89,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteReply.Builder) = { + def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 7b3ad3ca03..6703d677c0 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -11,7 +11,7 @@ import java.util.{Map => JMap} import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.util._ -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._ import se.scalablesolutions.akka.config.Config.config import org.jboss.netty.bootstrap.ServerBootstrap @@ -199,25 +199,54 @@ class RemoteServer extends Logging { /** * Register Remote Actor by the Actor's 'id' field. */ - def register(actor: ActorRef) = synchronized { + def register(actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef) } } /** * Register Remote Actor by a specific 'id' passed as argument. + *

+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ - def register(id: String, actor: ActorRef) = synchronized { + def register(id: String, actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef) + } + } + + /** + * Unregister Remote Actor that is registered using its 'id' field (not custom ID). + */ + def unregister(actorRef: ActorRef) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + server.actors.remove(actorRef.id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + } + } + + /** + * Unregister Remote Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregister(id: String) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor with id [%s]", id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + val actorRef = server.actors.get(id) + server.actors.remove(id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } } -case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) +case class Codec(encoder: ChannelHandler, decoder: ChannelHandler) /** * @author Jonas Bonér @@ -233,7 +262,7 @@ class RemoteServerPipelineFactory( def getPipeline: ChannelPipeline = { val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -283,50 +312,37 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val message = event.getMessage if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) - if (message.isInstanceOf[RemoteRequest]) { - handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) + if (message.isInstanceOf[RemoteRequestProtocol]) { + handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + event.getCause.printStackTrace log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close } - private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { - log.debug("Received RemoteRequest[\n%s]", request.toString) + private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) if (request.getIsActor) dispatchToActor(request, channel) else dispatchToActiveObject(request, channel) } - private def dispatchToActor(request: RemoteRequest, channel: Channel) = { - log.debug("Dispatching to remote actor [%s]", request.getTarget) - val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout) - actorId.start - + private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid) + val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout) + actorRef.start val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { - if (request.hasSourceHostname && request.hasSourcePort) { - // re-create the sending actor - val targetClass = if (request.hasSourceTarget) request.getSourceTarget - else request.getTarget - - val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout) - if (!remoteActorId.isRunning) { - remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort) - remoteActorId.start - } - actorId.!(message)(Some(remoteActorId)) - } else { - // couldn't find a way to reply, send the message without a source/sender - actorId ! message - } + val sender = request.getSender + if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender))) } else { try { - val resultOrNone = actorId !! message + val resultOrNone = actorRef !! message val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(true) @@ -337,7 +353,7 @@ class RemoteServerHandler( } catch { case e: Throwable => log.error(e, "Could not invoke remote actor [%s]", request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -349,7 +365,7 @@ class RemoteServerHandler( } } - private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = { + private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget) val activeObject = createActiveObject(request.getTarget, request.getTimeout) @@ -365,7 +381,7 @@ class RemoteServerHandler( else { val result = messageReceiver.invoke(activeObject, unescapedArgs: _*) log.debug("Returning result from remote active object invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(false) @@ -377,7 +393,7 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage) .setIsSuccessful(false) @@ -387,7 +403,7 @@ class RemoteServerHandler( channel.write(replyMessage) case e: Throwable => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -442,8 +458,9 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(name: String, uuid: String, timeout: Long): ActorRef = { - val actorIdOrNull = actors.get(uuid) - if (actorIdOrNull eq null) { + val actorRefOrNull = actors.get(uuid) + println("----------- ACTOR " + actorRefOrNull + " " + uuid) + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) @@ -452,14 +469,14 @@ class RemoteServerHandler( newInstance._uuid = uuid newInstance.timeout = timeout newInstance._remoteAddress = None - val actorId = new ActorRef(() => newInstance) - actors.put(uuid, actorId) - actorId + val actorRef = new ActorRef(() => newInstance) + actors.put(uuid, actorRef) + actorRef } catch { case e => log.error(e, "Could not create remote actor instance") throw e } - } else actorIdOrNull + } else actorRefOrNull } } diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index 450c179163..63f27a631d 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -171,7 +171,7 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional { * Necessary to keep from being implicitly converted to Iterable in for comprehensions. */ def withFilter(p: T => Boolean): WithFilter = new WithFilter(p) - + class WithFilter(p: T => Boolean) { def map[B](f: T => B): TransactionalRef[B] = self filter p map f def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f diff --git a/akka-core/src/test/scala/FutureSpec.scala b/akka-core/src/test/scala/FutureSpec.scala index 59822d9ea7..4f9da6572f 100644 --- a/akka-core/src/test/scala/FutureSpec.scala +++ b/akka-core/src/test/scala/FutureSpec.scala @@ -20,7 +20,7 @@ object FutureSpec { class FutureSpec extends JUnitSuite { import FutureSpec._ - @Test def shouldActorReplyResultThroughExplicitFuture = { + @Test def shouldActorReplyResultThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Hello" @@ -30,7 +30,7 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldActorReplyExceptionThroughExplicitFuture = { + @Test def shouldActorReplyExceptionThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Failure" diff --git a/akka-core/src/test/scala/MemoryFootprintSpec.scala b/akka-core/src/test/scala/MemoryFootprintSpec.scala deleted file mode 100644 index 9efb8270cb..0000000000 --- a/akka-core/src/test/scala/MemoryFootprintSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -package se.scalablesolutions.akka.actor - -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import Actor._ - -class MemoryFootprintSpec extends JUnitSuite { - class Mem extends Actor { - def receive = { - case _ => {} - } - } - - val NR_OF_ACTORS = 100000 - val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 700 - - @Test - def actorsShouldHaveLessMemoryFootprintThan700Bytes = { - println("============== MEMORY FOOTPRINT TEST ==============") - // warm up - (1 until 10000).foreach(i => new Mem) - - // Actors are put in AspectRegistry when created so they won't be GCd here - - val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory before " + totalMem) - (1 until NR_OF_ACTORS).foreach(i => new Mem) - - val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory aftor " + newTotalMem) - val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS - - println("Memory footprint per actor is : " + memPerActor) - assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes - } -} diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 6ecefca1e3..51d1e0876f 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -105,7 +105,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendRemoteReply = { + def shouldSendRemoteReplyProtocol = { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", diff --git a/akka-core/src/test/scala/TransactionalRefSpec.scala b/akka-core/src/test/scala/TransactionalRefSpec.scala index 897e9583e0..14a0a8f186 100644 --- a/akka-core/src/test/scala/TransactionalRefSpec.scala +++ b/akka-core/src/test/scala/TransactionalRefSpec.scala @@ -98,9 +98,9 @@ class TransactionalRefSpec extends Spec with ShouldMatchers { var result = 0 atomic { - for (value <- ref) { - result += value - } + for (value <- ref) { + result += value + } } result should be(3)