Added ActorRef.receiveTimout to remote protocol and LocalActorRef serialization

This commit is contained in:
Jonas Bonér 2010-07-14 15:35:07 +02:00
parent 17b752af8e
commit 809a00a469
5 changed files with 118 additions and 71 deletions

View file

@ -667,29 +667,36 @@ public final class RemoteProtocol {
public boolean hasTimeout() { return hasTimeout; }
public long getTimeout() { return timeout_; }
// optional .LifeCycleProtocol lifeCycle = 9;
public static final int LIFECYCLE_FIELD_NUMBER = 9;
// optional uint64 receiveTimeout = 9;
public static final int RECEIVETIMEOUT_FIELD_NUMBER = 9;
private boolean hasReceiveTimeout;
private long receiveTimeout_ = 0L;
public boolean hasReceiveTimeout() { return hasReceiveTimeout; }
public long getReceiveTimeout() { return receiveTimeout_; }
// optional .LifeCycleProtocol lifeCycle = 10;
public static final int LIFECYCLE_FIELD_NUMBER = 10;
private boolean hasLifeCycle;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_;
public boolean hasLifeCycle() { return hasLifeCycle; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; }
// optional .RemoteActorRefProtocol supervisor = 10;
public static final int SUPERVISOR_FIELD_NUMBER = 10;
// optional .RemoteActorRefProtocol supervisor = 11;
public static final int SUPERVISOR_FIELD_NUMBER = 11;
private boolean hasSupervisor;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_;
public boolean hasSupervisor() { return hasSupervisor; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; }
// optional bytes hotswapStack = 11;
public static final int HOTSWAPSTACK_FIELD_NUMBER = 11;
// optional bytes hotswapStack = 12;
public static final int HOTSWAPSTACK_FIELD_NUMBER = 12;
private boolean hasHotswapStack;
private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasHotswapStack() { return hasHotswapStack; }
public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; }
// repeated .RemoteRequestProtocol messages = 12;
public static final int MESSAGES_FIELD_NUMBER = 12;
// repeated .RemoteRequestProtocol messages = 13;
public static final int MESSAGES_FIELD_NUMBER = 13;
private java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> messages_ =
java.util.Collections.emptyList();
public java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> getMessagesList() {
@ -750,17 +757,20 @@ public final class RemoteProtocol {
if (hasTimeout()) {
output.writeUInt64(8, getTimeout());
}
if (hasReceiveTimeout()) {
output.writeUInt64(9, getReceiveTimeout());
}
if (hasLifeCycle()) {
output.writeMessage(9, getLifeCycle());
output.writeMessage(10, getLifeCycle());
}
if (hasSupervisor()) {
output.writeMessage(10, getSupervisor());
output.writeMessage(11, getSupervisor());
}
if (hasHotswapStack()) {
output.writeBytes(11, getHotswapStack());
output.writeBytes(12, getHotswapStack());
}
for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) {
output.writeMessage(12, element);
output.writeMessage(13, element);
}
getUnknownFields().writeTo(output);
}
@ -803,21 +813,25 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(8, getTimeout());
}
if (hasReceiveTimeout()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(9, getReceiveTimeout());
}
if (hasLifeCycle()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(9, getLifeCycle());
.computeMessageSize(10, getLifeCycle());
}
if (hasSupervisor()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, getSupervisor());
.computeMessageSize(11, getSupervisor());
}
if (hasHotswapStack()) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(11, getHotswapStack());
.computeBytesSize(12, getHotswapStack());
}
for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol element : getMessagesList()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(12, element);
.computeMessageSize(13, element);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@ -1005,6 +1019,9 @@ public final class RemoteProtocol {
if (other.hasTimeout()) {
setTimeout(other.getTimeout());
}
if (other.hasReceiveTimeout()) {
setReceiveTimeout(other.getReceiveTimeout());
}
if (other.hasLifeCycle()) {
mergeLifeCycle(other.getLifeCycle());
}
@ -1082,7 +1099,11 @@ public final class RemoteProtocol {
setTimeout(input.readUInt64());
break;
}
case 74: {
case 72: {
setReceiveTimeout(input.readUInt64());
break;
}
case 82: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.newBuilder();
if (hasLifeCycle()) {
subBuilder.mergeFrom(getLifeCycle());
@ -1091,7 +1112,7 @@ public final class RemoteProtocol {
setLifeCycle(subBuilder.buildPartial());
break;
}
case 82: {
case 90: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder();
if (hasSupervisor()) {
subBuilder.mergeFrom(getSupervisor());
@ -1100,11 +1121,11 @@ public final class RemoteProtocol {
setSupervisor(subBuilder.buildPartial());
break;
}
case 90: {
case 98: {
setHotswapStack(input.readBytes());
break;
}
case 98: {
case 106: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.newBuilder();
input.readMessage(subBuilder, extensionRegistry);
addMessages(subBuilder.buildPartial());
@ -1293,7 +1314,25 @@ public final class RemoteProtocol {
return this;
}
// optional .LifeCycleProtocol lifeCycle = 9;
// optional uint64 receiveTimeout = 9;
public boolean hasReceiveTimeout() {
return result.hasReceiveTimeout();
}
public long getReceiveTimeout() {
return result.getReceiveTimeout();
}
public Builder setReceiveTimeout(long value) {
result.hasReceiveTimeout = true;
result.receiveTimeout_ = value;
return this;
}
public Builder clearReceiveTimeout() {
result.hasReceiveTimeout = false;
result.receiveTimeout_ = 0L;
return this;
}
// optional .LifeCycleProtocol lifeCycle = 10;
public boolean hasLifeCycle() {
return result.hasLifeCycle();
}
@ -1330,7 +1369,7 @@ public final class RemoteProtocol {
return this;
}
// optional .RemoteActorRefProtocol supervisor = 10;
// optional .RemoteActorRefProtocol supervisor = 11;
public boolean hasSupervisor() {
return result.hasSupervisor();
}
@ -1367,7 +1406,7 @@ public final class RemoteProtocol {
return this;
}
// optional bytes hotswapStack = 11;
// optional bytes hotswapStack = 12;
public boolean hasHotswapStack() {
return result.hasHotswapStack();
}
@ -1388,7 +1427,7 @@ public final class RemoteProtocol {
return this;
}
// repeated .RemoteRequestProtocol messages = 12;
// repeated .RemoteRequestProtocol messages = 13;
public java.util.List<se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol> getMessagesList() {
return java.util.Collections.unmodifiableList(result.messages_);
}
@ -4210,40 +4249,40 @@ public final class RemoteProtocol {
"\n\024RemoteProtocol.proto\"v\n\026RemoteActorRef" +
"Protocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassname" +
"\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPro" +
"tocol\022\017\n\007timeout\030\004 \001(\004\"\350\002\n\032SerializedAct" +
"tocol\022\017\n\007timeout\030\004 \001(\004\"\200\003\n\032SerializedAct" +
"orRefProtocol\022\014\n\004uuid\030\001 \002(\t\022\n\n\002id\030\002 \002(\t\022" +
"\026\n\016actorClassname\030\003 \002(\t\022)\n\017originalAddre" +
"ss\030\004 \002(\0132\020.AddressProtocol\022\025\n\ractorInsta" +
"nce\030\005 \001(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\024" +
"\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022%\n" +
"\tlifeCycle\030\t \001(\0132\022.LifeCycleProtocol\022+\n\n",
"supervisor\030\n \001(\0132\027.RemoteActorRefProtoco" +
"l\022\024\n\014hotswapStack\030\013 \001(\014\022(\n\010messages\030\014 \003(" +
"\0132\026.RemoteRequestProtocol\"r\n\017MessageProt" +
"ocol\0225\n\023serializationScheme\030\001 \002(\0162\030.Seri" +
"alizationSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017" +
"messageManifest\030\003 \001(\014\"\374\001\n\025RemoteRequestP" +
"rotocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \002(\0132\020.M" +
"essageProtocol\022\016\n\006method\030\003 \001(\t\022\016\n\006target" +
"\030\004 \002(\t\022\014\n\004uuid\030\005 \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n" +
"\016supervisorUuid\030\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020",
"\n\010isOneWay\030\t \002(\010\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006s" +
"ender\030\013 \001(\0132\027.RemoteActorRefProtocol\"\252\001\n" +
"\023RemoteReplyProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007mess" +
"age\030\002 \001(\0132\020.MessageProtocol\022%\n\texception" +
"\030\003 \001(\0132\022.ExceptionProtocol\022\026\n\016supervisor" +
"Uuid\030\004 \001(\t\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccess" +
"ful\030\006 \002(\010\"_\n\021LifeCycleProtocol\022!\n\tlifeCy" +
"cle\030\001 \002(\0162\016.LifeCycleType\022\022\n\npreRestart\030" +
"\002 \001(\t\022\023\n\013postRestart\030\003 \001(\t\"1\n\017AddressPro" +
"tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n",
"\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n" +
"\007message\030\002 \002(\t*]\n\027SerializationSchemeTyp" +
"e\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003" +
"\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCyc" +
"leType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)" +
"se.scalablesolutions.akka.remote.protoco" +
"lH\001"
"\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n" +
"\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\013",
"2\022.LifeCycleProtocol\022+\n\nsupervisor\030\013 \001(\013" +
"2\027.RemoteActorRefProtocol\022\024\n\014hotswapStac" +
"k\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.RemoteReques" +
"tProtocol\"r\n\017MessageProtocol\0225\n\023serializ" +
"ationScheme\030\001 \002(\0162\030.SerializationSchemeT" +
"ype\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030" +
"\003 \001(\014\"\374\001\n\025RemoteRequestProtocol\022\n\n\002id\030\001 " +
"\002(\004\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022\016" +
"\n\006method\030\003 \001(\t\022\016\n\006target\030\004 \002(\t\022\014\n\004uuid\030\005" +
" \002(\t\022\017\n\007timeout\030\006 \002(\004\022\026\n\016supervisorUuid\030",
"\007 \001(\t\022\017\n\007isActor\030\010 \002(\010\022\020\n\010isOneWay\030\t \002(\010" +
"\022\021\n\tisEscaped\030\n \002(\010\022\'\n\006sender\030\013 \001(\0132\027.Re" +
"moteActorRefProtocol\"\252\001\n\023RemoteReplyProt" +
"ocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132\020.Mess" +
"ageProtocol\022%\n\texception\030\003 \001(\0132\022.Excepti" +
"onProtocol\022\026\n\016supervisorUuid\030\004 \001(\t\022\017\n\007is" +
"Actor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\"_\n\021Lif" +
"eCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Life" +
"CycleType\022\022\n\npreRestart\030\002 \001(\t\022\023\n\013postRes" +
"tart\030\003 \001(\t\"1\n\017AddressProtocol\022\020\n\010hostnam",
"e\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtoc" +
"ol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*]" +
"\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007S" +
"BINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022" +
"\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMAN" +
"ENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolut" +
"ions.akka.remote.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -4263,7 +4302,7 @@ public final class RemoteProtocol {
internal_static_SerializedActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SerializedActorRefProtocol_descriptor,
new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", },
new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class);
internal_static_MessageProtocol_descriptor =

View file

@ -36,10 +36,11 @@ message SerializedActorRefProtocol {
optional string serializerClassname = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11;
repeated RemoteRequestProtocol messages = 12;
optional uint64 receiveTimeout = 9;
optional LifeCycleProtocol lifeCycle = 10;
optional RemoteActorRefProtocol supervisor = 11;
optional bytes hotswapStack = 12;
repeated RemoteRequestProtocol messages = 13;
}
/**

View file

@ -606,6 +606,7 @@ sealed class LocalActorRef private[akka](
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
@ -627,6 +628,7 @@ sealed class LocalActorRef private[akka](
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap

View file

@ -77,14 +77,14 @@ object ActorSerialization {
toSerializedActorRefProtocol(a, format).toByteArray
}
private def toSerializedActorRefProtocol[T <: Actor](a: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
private def toSerializedActorRefProtocol[T <: Actor](actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
}
val builder = LifeCycleProtocol.newBuilder
a.lifeCycle match {
actorRef.lifeCycle match {
case Some(LifeCycle(scope, None, _)) =>
setScope(builder, scope)
Some(builder.build)
@ -98,21 +98,22 @@ object ActorSerialization {
}
val originalAddress = AddressProtocol.newBuilder
.setHostname(a.homeAddress.getHostName)
.setPort(a.homeAddress.getPort)
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(a.uuid)
.setId(a.id)
.setActorClassname(a.actorClass.getName)
.setUuid(actorRef.uuid)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
.setOriginalAddress(originalAddress)
.setIsTransactor(a.isTransactor)
.setTimeout(a.timeout)
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
builder.setActorInstance(ByteString.copyFrom(format.toBinary(a.actor.asInstanceOf[T])))
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
a.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
// FIXME: how to serialize the hotswap PartialFunction ??
//hotswap.foreach(builder.setHotswapStack(_))
builder.build
@ -161,6 +162,7 @@ object ActorSerialization {
protocol.getOriginalAddress.getPort,
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None,
lifeCycle,
supervisor,
hotswap,

View file

@ -110,6 +110,8 @@ class SerializableTypeClassActorSpec extends
val actor2 = fromBinary(bytes)
actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world 3")
actor2.receiveTimeout should equal (Some(1000))
}
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {
@ -172,7 +174,8 @@ class MyStatelessActorWithMessagesInMailbox extends Actor {
@serializable class MyJavaSerializableActor extends Actor {
var count = 0
self.receiveTimeout = Some(1000)
def receive = {
case "hello" =>
count = count + 1