diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index 648f53842b..9af73c6c77 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -667,29 +667,36 @@ public final class RemoteProtocol { public boolean hasTimeout() { return hasTimeout; } public long getTimeout() { return timeout_; } - // optional .LifeCycleProtocol lifeCycle = 9; - public static final int LIFECYCLE_FIELD_NUMBER = 9; + // optional uint64 receiveTimeout = 9; + public static final int RECEIVETIMEOUT_FIELD_NUMBER = 9; + private boolean hasReceiveTimeout; + private long receiveTimeout_ = 0L; + public boolean hasReceiveTimeout() { return hasReceiveTimeout; } + public long getReceiveTimeout() { return receiveTimeout_; } + + // optional .LifeCycleProtocol lifeCycle = 10; + public static final int LIFECYCLE_FIELD_NUMBER = 10; private boolean hasLifeCycle; private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_; public boolean hasLifeCycle() { return hasLifeCycle; } public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; } - // optional .RemoteActorRefProtocol supervisor = 10; - public static final int SUPERVISOR_FIELD_NUMBER = 10; + // optional .RemoteActorRefProtocol supervisor = 11; + public static final int SUPERVISOR_FIELD_NUMBER = 11; private boolean hasSupervisor; private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_; public boolean hasSupervisor() { return hasSupervisor; } public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; } - // optional bytes hotswapStack = 11; - public static final int HOTSWAPSTACK_FIELD_NUMBER = 11; + // optional bytes hotswapStack = 12; + public static final int HOTSWAPSTACK_FIELD_NUMBER = 12; private boolean hasHotswapStack; private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY; public boolean hasHotswapStack() { return hasHotswapStack; } public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; } - // repeated .RemoteRequestProtocol messages = 12; - public static final int MESSAGES_FIELD_NUMBER = 12; + // repeated .RemoteRequestProtocol messages = 13; + public static final int MESSAGES_FIELD_NUMBER = 13; private java.util.List messages_ = java.util.Collections.emptyList(); public java.util.List getMessagesList() { @@ -750,17 +757,20 @@ public final class RemoteProtocol { if (hasTimeout()) { output.writeUInt64(8, getTimeout()); } + if (hasReceiveTimeout()) { + output.writeUInt64(9, getReceiveTimeout()); + } if (hasLifeCycle()) { - output.writeMessage(9, getLifeCycle()); + output.writeMessage(10, getLifeCycle()); } if (hasSupervisor()) { - output.writeMessage(10, getSupervisor()); + output.writeMessage(11, getSupervisor()); } if (hasHotswapStack()) { - output.writeBytes(11, getHotswapStack()); + output.writeBytes(12, getHotswapStack()); } for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) { - output.writeMessage(12, element); + output.writeMessage(13, element); } getUnknownFields().writeTo(output); } @@ -803,21 +813,25 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(8, getTimeout()); } + if (hasReceiveTimeout()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(9, getReceiveTimeout()); + } if (hasLifeCycle()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(9, getLifeCycle()); + .computeMessageSize(10, getLifeCycle()); } if (hasSupervisor()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(10, getSupervisor()); + .computeMessageSize(11, getSupervisor()); } if (hasHotswapStack()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(11, getHotswapStack()); + .computeBytesSize(12, getHotswapStack()); } for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(12, element); + .computeMessageSize(13, element); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -1005,6 +1019,9 @@ public final class RemoteProtocol { if (other.hasTimeout()) { setTimeout(other.getTimeout()); } + if (other.hasReceiveTimeout()) { + setReceiveTimeout(other.getReceiveTimeout()); + } if (other.hasLifeCycle()) { mergeLifeCycle(other.getLifeCycle()); } @@ -1082,7 +1099,11 @@ public final class RemoteProtocol { setTimeout(input.readUInt64()); break; } - case 74: { + case 72: { + setReceiveTimeout(input.readUInt64()); + break; + } + case 82: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.newBuilder(); if (hasLifeCycle()) { subBuilder.mergeFrom(getLifeCycle()); @@ -1091,7 +1112,7 @@ public final class RemoteProtocol { setLifeCycle(subBuilder.buildPartial()); break; } - case 82: { + case 90: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(); if (hasSupervisor()) { subBuilder.mergeFrom(getSupervisor()); @@ -1100,11 +1121,11 @@ public final class RemoteProtocol { setSupervisor(subBuilder.buildPartial()); break; } - case 90: { + case 98: { setHotswapStack(input.readBytes()); break; } - case 98: { + case 106: { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.newBuilder(); input.readMessage(subBuilder, extensionRegistry); addMessages(subBuilder.buildPartial()); @@ -1293,7 +1314,25 @@ public final class RemoteProtocol { return this; } - // optional .LifeCycleProtocol lifeCycle = 9; + // optional uint64 receiveTimeout = 9; + public boolean hasReceiveTimeout() { + return result.hasReceiveTimeout(); + } + public long getReceiveTimeout() { + return result.getReceiveTimeout(); + } + public Builder setReceiveTimeout(long value) { + result.hasReceiveTimeout = true; + result.receiveTimeout_ = value; + return this; + } + public Builder clearReceiveTimeout() { + result.hasReceiveTimeout = false; + result.receiveTimeout_ = 0L; + return this; + } + + // optional .LifeCycleProtocol lifeCycle = 10; public boolean hasLifeCycle() { return result.hasLifeCycle(); } @@ -1330,7 +1369,7 @@ public final class RemoteProtocol { return this; } - // optional .RemoteActorRefProtocol supervisor = 10; + // optional .RemoteActorRefProtocol supervisor = 11; public boolean hasSupervisor() { return result.hasSupervisor(); } @@ -1367,7 +1406,7 @@ public final class RemoteProtocol { return this; } - // optional bytes hotswapStack = 11; + // optional bytes hotswapStack = 12; public boolean hasHotswapStack() { return result.hasHotswapStack(); } @@ -1388,7 +1427,7 @@ public final class RemoteProtocol { return this; } - // repeated .RemoteRequestProtocol messages = 12; + // repeated .RemoteRequestProtocol messages = 13; public java.util.List getMessagesList() { return java.util.Collections.unmodifiableList(result.messages_); } @@ -4210,40 +4249,40 @@ public final class RemoteProtocol { "\n\024RemoteProtocol.proto\"v\n\026RemoteActorRef" + "Protocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassname" + "\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPro" + - "tocol\022\017\n\007timeout\030\004 \001(\004\"\350\002\n\032SerializedAct" + + "tocol\022\017\n\007timeout\030\004 \001(\004\"\200\003\n\032SerializedAct" + "orRefProtocol\022\014\n\004uuid\030\001 \002(\t\022\n\n\002id\030\002 \002(\t\022" + "\026\n\016actorClassname\030\003 \002(\t\022)\n\017originalAddre" + "ss\030\004 \002(\0132\020.AddressProtocol\022\025\n\ractorInsta" + "nce\030\005 \001(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\024" + - "\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022%\n" + - "\tlifeCycle\030\t \001(\0132\022.LifeCycleProtocol\022+\n\n", - "supervisor\030\n \001(\0132\027.RemoteActorRefProtoco" + - "l\022\024\n\014hotswapStack\030\013 \001(\014\022(\n\010messages\030\014 \003(" + - "\0132\026.RemoteRequestProtocol\"r\n\017MessageProt" + - "ocol\0225\n\023serializationScheme\030\001 \002(\0162\030.Seri" + - "alizationSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017" + - "messageManifest\030\003 \001(\014\"\374\001\n\025RemoteRequestP" + - "rotocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \002(\0132\020.M" + - "essageProtocol\022\016\n\006method\030\003 \001(\t\022\016\n\006target" + - "\030\004 \002(\t\022\014\n\004uuid\030\005 \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n" + - "\016supervisorUuid\030\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020", - "\n\010isOneWay\030\t \002(\010\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006s" + - "ender\030\013 \001(\0132\027.RemoteActorRefProtocol\"\252\001\n" + - "\023RemoteReplyProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007mess" + - "age\030\002 \001(\0132\020.MessageProtocol\022%\n\texception" + - "\030\003 \001(\0132\022.ExceptionProtocol\022\026\n\016supervisor" + - "Uuid\030\004 \001(\t\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccess" + - "ful\030\006 \002(\010\"_\n\021LifeCycleProtocol\022!\n\tlifeCy" + - "cle\030\001 \002(\0162\016.LifeCycleType\022\022\n\npreRestart\030" + - "\002 \001(\t\022\023\n\013postRestart\030\003 \001(\t\"1\n\017AddressPro" + - "tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n", - "\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n" + - "\007message\030\002 \002(\t*]\n\027SerializationSchemeTyp" + - "e\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003" + - "\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCyc" + - "leType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)" + - "se.scalablesolutions.akka.remote.protoco" + - "lH\001" + "\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n" + + "\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\013", + "2\022.LifeCycleProtocol\022+\n\nsupervisor\030\013 \001(\013" + + "2\027.RemoteActorRefProtocol\022\024\n\014hotswapStac" + + "k\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.RemoteReques" + + "tProtocol\"r\n\017MessageProtocol\0225\n\023serializ" + + "ationScheme\030\001 \002(\0162\030.SerializationSchemeT" + + "ype\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030" + + "\003 \001(\014\"\374\001\n\025RemoteRequestProtocol\022\n\n\002id\030\001 " + + "\002(\004\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022\016" + + "\n\006method\030\003 \001(\t\022\016\n\006target\030\004 \002(\t\022\014\n\004uuid\030\005" + + " \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n\016supervisorUuid\030", + "\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020\n\010isOneWay\030\t \002(\010" + + "\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006sender\030\013 \001(\0132\027.Re" + + "moteActorRefProtocol\"\252\001\n\023RemoteReplyProt" + + "ocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132\020.Mess" + + "ageProtocol\022%\n\texception\030\003 \001(\0132\022.Excepti" + + "onProtocol\022\026\n\016supervisorUuid\030\004 \001(\t\022\017\n\007is" + + "Actor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\"_\n\021Lif" + + "eCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Life" + + "CycleType\022\022\n\npreRestart\030\002 \001(\t\022\023\n\013postRes" + + "tart\030\003 \001(\t\"1\n\017AddressProtocol\022\020\n\010hostnam", + "e\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtoc" + + "ol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*]" + + "\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007S" + + "BINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022" + + "\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMAN" + + "ENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolut" + + "ions.akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4263,7 +4302,7 @@ public final class RemoteProtocol { internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, - new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, + new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = diff --git a/akka-core/src/main/protocol/RemoteProtocol.proto b/akka-core/src/main/protocol/RemoteProtocol.proto index 6d8b8995f4..c4e5a8157e 100644 --- a/akka-core/src/main/protocol/RemoteProtocol.proto +++ b/akka-core/src/main/protocol/RemoteProtocol.proto @@ -36,10 +36,11 @@ message SerializedActorRefProtocol { optional string serializerClassname = 6; optional bool isTransactor = 7; optional uint64 timeout = 8; - optional LifeCycleProtocol lifeCycle = 9; - optional RemoteActorRefProtocol supervisor = 10; - optional bytes hotswapStack = 11; - repeated RemoteRequestProtocol messages = 12; + optional uint64 receiveTimeout = 9; + optional LifeCycleProtocol lifeCycle = 10; + optional RemoteActorRefProtocol supervisor = 11; + optional bytes hotswapStack = 12; + repeated RemoteRequestProtocol messages = 13; } /** diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 391fe8d429..79a635ead8 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -606,6 +606,7 @@ sealed class LocalActorRef private[akka]( __port: Int, __isTransactor: Boolean, __timeout: Long, + __receiveTimeout: Option[Long], __lifeCycle: Option[LifeCycle], __supervisor: Option[ActorRef], __hotswap: Option[PartialFunction[Any, Unit]], @@ -627,6 +628,7 @@ sealed class LocalActorRef private[akka]( homeAddress = (__hostname, __port) isTransactor = __isTransactor timeout = __timeout + receiveTimeout = __receiveTimeout lifeCycle = __lifeCycle _supervisor = __supervisor hotswap = __hotswap diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala index d549bb8c80..13e8230638 100644 --- a/akka-core/src/main/scala/actor/SerializationProtocol.scala +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -77,14 +77,14 @@ object ActorSerialization { toSerializedActorRefProtocol(a, format).toByteArray } - private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = { + private def toSerializedActorRefProtocol[T <: Actor](actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY) } val builder = LifeCycleProtocol.newBuilder - a.lifeCycle match { + actorRef.lifeCycle match { case Some(LifeCycle(scope, None, _)) => setScope(builder, scope) Some(builder.build) @@ -98,21 +98,22 @@ object ActorSerialization { } val originalAddress = AddressProtocol.newBuilder - .setHostname(a.homeAddress.getHostName) - .setPort(a.homeAddress.getPort) + .setHostname(actorRef.homeAddress.getHostName) + .setPort(actorRef.homeAddress.getPort) .build val builder = SerializedActorRefProtocol.newBuilder - .setUuid(a.uuid) - .setId(a.id) - .setActorClassname(a.actorClass.getName) + .setUuid(actorRef.uuid) + .setId(actorRef.id) + .setActorClassname(actorRef.actorClass.getName) .setOriginalAddress(originalAddress) - .setIsTransactor(a.isTransactor) - .setTimeout(a.timeout) + .setIsTransactor(actorRef.isTransactor) + .setTimeout(actorRef.timeout) - builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T]))) + actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) + builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) - a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) + actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) // FIXME: how to serialize the hotswap PartialFunction ?? //hotswap.foreach(builder.setHotswapStack(_)) builder.build @@ -161,6 +162,7 @@ object ActorSerialization { protocol.getOriginalAddress.getPort, if (protocol.hasIsTransactor) protocol.getIsTransactor else false, if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, + if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None, lifeCycle, supervisor, hotswap, diff --git a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala index 99a2b99dc9..2b26c9ad81 100644 --- a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala +++ b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala @@ -110,6 +110,8 @@ class SerializableTypeClassActorSpec extends val actor2 = fromBinary(bytes) actor2.start (actor2 !! "hello").getOrElse("_") should equal("world 3") + + actor2.receiveTimeout should equal (Some(1000)) } it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { @@ -172,7 +174,8 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { @serializable class MyJavaSerializableActor extends Actor { var count = 0 - + self.receiveTimeout = Some(1000) + def receive = { case "hello" => count = count + 1