diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 5d19c00a06..757c148e3a 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -21,7 +21,7 @@ public final class RemoteProtocol { public final int getNumber() { return value; } - + public static CommandType valueOf(int value) { switch (value) { case 1: return CONNECT; @@ -83,10 +83,9 @@ public final class RemoteProtocol { public interface AkkaRemoteProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional .RemoteMessageProtocol message = 1; - boolean hasMessage(); - akka.remote.RemoteProtocol.RemoteMessageProtocol getMessage(); - akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder getMessageOrBuilder(); + // optional bytes payload = 1; + boolean hasPayload(); + com.google.protobuf.ByteString getPayload(); // optional .RemoteControlProtocol instruction = 2; boolean hasInstruction(); @@ -122,17 +121,14 @@ public final class RemoteProtocol { } private int bitField0_; - // optional .RemoteMessageProtocol message = 1; - public static final int MESSAGE_FIELD_NUMBER = 1; - private akka.remote.RemoteProtocol.RemoteMessageProtocol message_; - public boolean hasMessage() { + // optional bytes payload = 1; + public static final int PAYLOAD_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString payload_; + public boolean hasPayload() { return ((bitField0_ & 0x00000001) == 0x00000001); } - public akka.remote.RemoteProtocol.RemoteMessageProtocol getMessage() { - return message_; - } - public akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder getMessageOrBuilder() { - return message_; + public com.google.protobuf.ByteString getPayload() { + return payload_; } // optional .RemoteControlProtocol instruction = 2; @@ -149,7 +145,7 @@ public final class RemoteProtocol { } private void initFields() { - message_ = akka.remote.RemoteProtocol.RemoteMessageProtocol.getDefaultInstance(); + payload_ = com.google.protobuf.ByteString.EMPTY; instruction_ = akka.remote.RemoteProtocol.RemoteControlProtocol.getDefaultInstance(); } private byte memoizedIsInitialized = -1; @@ -157,12 +153,6 @@ public final class RemoteProtocol { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (hasMessage()) { - if (!getMessage().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - } if (hasInstruction()) { if (!getInstruction().isInitialized()) { memoizedIsInitialized = 0; @@ -177,7 +167,7 @@ public final class RemoteProtocol { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeMessage(1, message_); + output.writeBytes(1, payload_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, instruction_); @@ -193,7 +183,7 @@ public final class RemoteProtocol { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(1, message_); + .computeBytesSize(1, payload_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -315,7 +305,6 @@ public final class RemoteProtocol { } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - getMessageFieldBuilder(); getInstructionFieldBuilder(); } } @@ -325,11 +314,7 @@ public final class RemoteProtocol { public Builder clear() { super.clear(); - if (messageBuilder_ == null) { - message_ = akka.remote.RemoteProtocol.RemoteMessageProtocol.getDefaultInstance(); - } else { - messageBuilder_.clear(); - } + payload_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000001); if (instructionBuilder_ == null) { instruction_ = akka.remote.RemoteProtocol.RemoteControlProtocol.getDefaultInstance(); @@ -378,11 +363,7 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - if (messageBuilder_ == null) { - result.message_ = message_; - } else { - result.message_ = messageBuilder_.build(); - } + result.payload_ = payload_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -407,8 +388,8 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.RemoteProtocol.AkkaRemoteProtocol other) { if (other == akka.remote.RemoteProtocol.AkkaRemoteProtocol.getDefaultInstance()) return this; - if (other.hasMessage()) { - mergeMessage(other.getMessage()); + if (other.hasPayload()) { + setPayload(other.getPayload()); } if (other.hasInstruction()) { mergeInstruction(other.getInstruction()); @@ -418,12 +399,6 @@ public final class RemoteProtocol { } public final boolean isInitialized() { - if (hasMessage()) { - if (!getMessage().isInitialized()) { - - return false; - } - } if (hasInstruction()) { if (!getInstruction().isInitialized()) { @@ -457,12 +432,8 @@ public final class RemoteProtocol { break; } case 10: { - akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder subBuilder = akka.remote.RemoteProtocol.RemoteMessageProtocol.newBuilder(); - if (hasMessage()) { - subBuilder.mergeFrom(getMessage()); - } - input.readMessage(subBuilder, extensionRegistry); - setMessage(subBuilder.buildPartial()); + bitField0_ |= 0x00000001; + payload_ = input.readBytes(); break; } case 18: { @@ -480,94 +451,28 @@ public final class RemoteProtocol { private int bitField0_; - // optional .RemoteMessageProtocol message = 1; - private akka.remote.RemoteProtocol.RemoteMessageProtocol message_ = akka.remote.RemoteProtocol.RemoteMessageProtocol.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.RemoteMessageProtocol, akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder, akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder> messageBuilder_; - public boolean hasMessage() { + // optional bytes payload = 1; + private com.google.protobuf.ByteString payload_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasPayload() { return ((bitField0_ & 0x00000001) == 0x00000001); } - public akka.remote.RemoteProtocol.RemoteMessageProtocol getMessage() { - if (messageBuilder_ == null) { - return message_; - } else { - return messageBuilder_.getMessage(); - } + public com.google.protobuf.ByteString getPayload() { + return payload_; } - public Builder setMessage(akka.remote.RemoteProtocol.RemoteMessageProtocol value) { - if (messageBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - message_ = value; - onChanged(); - } else { - messageBuilder_.setMessage(value); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder setMessage( - akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder builderForValue) { - if (messageBuilder_ == null) { - message_ = builderForValue.build(); - onChanged(); - } else { - messageBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder mergeMessage(akka.remote.RemoteProtocol.RemoteMessageProtocol value) { - if (messageBuilder_ == null) { - if (((bitField0_ & 0x00000001) == 0x00000001) && - message_ != akka.remote.RemoteProtocol.RemoteMessageProtocol.getDefaultInstance()) { - message_ = - akka.remote.RemoteProtocol.RemoteMessageProtocol.newBuilder(message_).mergeFrom(value).buildPartial(); - } else { - message_ = value; - } - onChanged(); - } else { - messageBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder clearMessage() { - if (messageBuilder_ == null) { - message_ = akka.remote.RemoteProtocol.RemoteMessageProtocol.getDefaultInstance(); - onChanged(); - } else { - messageBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000001); - return this; - } - public akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder getMessageBuilder() { - bitField0_ |= 0x00000001; + public Builder setPayload(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + payload_ = value; onChanged(); - return getMessageFieldBuilder().getBuilder(); + return this; } - public akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder getMessageOrBuilder() { - if (messageBuilder_ != null) { - return messageBuilder_.getMessageOrBuilder(); - } else { - return message_; - } - } - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.RemoteMessageProtocol, akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder, akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder> - getMessageFieldBuilder() { - if (messageBuilder_ == null) { - messageBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.RemoteMessageProtocol, akka.remote.RemoteProtocol.RemoteMessageProtocol.Builder, akka.remote.RemoteProtocol.RemoteMessageProtocolOrBuilder>( - message_, - getParentForChildren(), - isClean()); - message_ = null; - } - return messageBuilder_; + public Builder clearPayload() { + bitField0_ = (bitField0_ & ~0x00000001); + payload_ = getDefaultInstance().getPayload(); + onChanged(); + return this; } // optional .RemoteControlProtocol instruction = 2; @@ -6504,34 +6409,34 @@ public final class RemoteProtocol { descriptor; static { java.lang.String[] descriptorData = { - "\n\024RemoteProtocol.proto\"j\n\022AkkaRemoteProt" + - "ocol\022\'\n\007message\030\001 \001(\0132\026.RemoteMessagePro" + - "tocol\022+\n\013instruction\030\002 \001(\0132\026.RemoteContr" + - "olProtocol\"\255\001\n\025RemoteMessageProtocol\022$\n\t" + - "recipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\007me" + - "ssage\030\002 \002(\0132\020.MessageProtocol\022!\n\006sender\030" + - "\004 \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003" + - "(\0132\026.MetadataEntryProtocol\"l\n\025RemoteCont" + - "rolProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comma" + - "ndType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020", - ".AddressProtocol\" \n\020ActorRefProtocol\022\014\n\004" + - "path\030\001 \002(\t\"Q\n\017MessageProtocol\022\017\n\007message" + - "\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageM" + - "anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" + - "\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"S\n\017AddressPro" + - "tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" + - "\014\n\004port\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"\216\001\n\027Daem" + - "onMsgCreateProtocol\022\035\n\005props\030\001 \002(\0132\016.Pro" + - "psProtocol\022\037\n\006deploy\030\002 \002(\0132\017.DeployProto" + - "col\022\014\n\004path\030\003 \002(\t\022%\n\nsupervisor\030\004 \002(\0132\021.", - "ActorRefProtocol\"\205\001\n\rPropsProtocol\022\022\n\ndi" + - "spatcher\030\001 \002(\t\022\037\n\006deploy\030\002 \002(\0132\017.DeployP" + - "rotocol\022\030\n\020fromClassCreator\030\003 \001(\t\022\017\n\007cre" + - "ator\030\004 \001(\014\022\024\n\014routerConfig\030\005 \001(\014\"S\n\016Depl" + - "oyProtocol\022\014\n\004path\030\001 \002(\t\022\016\n\006config\030\002 \001(\014" + - "\022\024\n\014routerConfig\030\003 \001(\014\022\r\n\005scope\030\004 \001(\014*7\n" + - "\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022" + - "\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001" + "\n\024RemoteProtocol.proto\"R\n\022AkkaRemoteProt" + + "ocol\022\017\n\007payload\030\001 \001(\014\022+\n\013instruction\030\002 \001" + + "(\0132\026.RemoteControlProtocol\"\255\001\n\025RemoteMes" + + "sageProtocol\022$\n\trecipient\030\001 \002(\0132\021.ActorR" + + "efProtocol\022!\n\007message\030\002 \002(\0132\020.MessagePro" + + "tocol\022!\n\006sender\030\004 \001(\0132\021.ActorRefProtocol" + + "\022(\n\010metadata\030\005 \003(\0132\026.MetadataEntryProtoc" + + "ol\"l\n\025RemoteControlProtocol\022!\n\013commandTy" + + "pe\030\001 \002(\0162\014.CommandType\022\016\n\006cookie\030\002 \001(\t\022 " + + "\n\006origin\030\003 \001(\0132\020.AddressProtocol\" \n\020Acto", + "rRefProtocol\022\014\n\004path\030\001 \002(\t\"Q\n\017MessagePro" + + "tocol\022\017\n\007message\030\001 \002(\014\022\024\n\014serializerId\030\002" + + " \002(\005\022\027\n\017messageManifest\030\003 \001(\014\"3\n\025Metadat" + + "aEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002" + + "(\014\"S\n\017AddressProtocol\022\016\n\006system\030\001 \002(\t\022\020\n" + + "\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010protoco" + + "l\030\004 \001(\t\"\216\001\n\027DaemonMsgCreateProtocol\022\035\n\005p" + + "rops\030\001 \002(\0132\016.PropsProtocol\022\037\n\006deploy\030\002 \002" + + "(\0132\017.DeployProtocol\022\014\n\004path\030\003 \002(\t\022%\n\nsup" + + "ervisor\030\004 \002(\0132\021.ActorRefProtocol\"\205\001\n\rPro", + "psProtocol\022\022\n\ndispatcher\030\001 \002(\t\022\037\n\006deploy" + + "\030\002 \002(\0132\017.DeployProtocol\022\030\n\020fromClassCrea" + + "tor\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014routerConf" + + "ig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path\030\001 \002(\t" + + "\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003 \001(\014\022\r" + + "\n\005scope\030\004 \001(\014*7\n\013CommandType\022\013\n\007CONNECT\020" + + "\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akka.r" + + "emoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6543,7 +6448,7 @@ public final class RemoteProtocol { internal_static_AkkaRemoteProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AkkaRemoteProtocol_descriptor, - new java.lang.String[] { "Message", "Instruction", }, + new java.lang.String[] { "Payload", "Instruction", }, akka.remote.RemoteProtocol.AkkaRemoteProtocol.class, akka.remote.RemoteProtocol.AkkaRemoteProtocol.Builder.class); internal_static_RemoteMessageProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 438a2a1e87..5a400e883f 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -14,7 +14,7 @@ option optimize_for = SPEED; *******************************************/ message AkkaRemoteProtocol { - optional RemoteMessageProtocol message = 1; + optional bytes payload = 1; optional RemoteControlProtocol instruction = 2; } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 9b72507593..d977751433 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -173,7 +173,7 @@ private[remote] class EndpointWriter( when(Writing) { case Event(Send(msg, senderOption, recipient), _) ⇒ - val pdu = codec.constructMessagePdu(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) + val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) val success = try handle.write(pdu) catch { case NonFatal(e) ⇒ publishAndThrow("Failed to write message to the transport", e) } @@ -205,7 +205,7 @@ private[remote] class EndpointWriter( } private def startReadEndpoint(): Unit = { - reader = context.actorOf(Props(new EndpointReader(codec, msgDispatch)), + reader = context.actorOf(Props(new EndpointReader(codec, handle.localAddress, msgDispatch)), "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8")) handle.readHandlerPromise.success(reader) context.watch(reader) @@ -221,6 +221,7 @@ private[remote] class EndpointWriter( private[remote] class EndpointReader( val codec: AkkaPduCodec, + val localAddress: Address, val msgDispatch: InboundMessageDispatcher) extends Actor { val provider = context.system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] @@ -228,18 +229,13 @@ private[remote] class EndpointReader( override def receive: Receive = { case Disassociated ⇒ context.stop(self) - // FIXME: Do 2 step deserialization (old-remoting must be removed first) - case InboundPayload(p) ⇒ decodePdu(p) match { - - case Message(recipient, recipientAddress, serializedMessage, senderOption) ⇒ - msgDispatch.dispatch(recipient, recipientAddress, serializedMessage, senderOption) - - case _ ⇒ - } + case InboundPayload(p) ⇒ + val msg = decodePdu(p) + msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) } - private def decodePdu(pdu: ByteString): AkkaPdu = try { - codec.decodePdu(pdu, provider) + private def decodePdu(pdu: ByteString): Message = try { + codec.decodeMessage(pdu, provider, localAddress) } catch { case NonFatal(e) ⇒ throw new EndpointException("Error while decoding incoming Akka PDU", e) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index df100249c7..94aeb65349 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -188,6 +188,19 @@ class RemoteActorRefProvider( case _ ⇒ local.actorFor(ref, path) } + /* + * INTERNAL API + * Called in deserialization of incoming remote messages. In this case the correct local address is known, therefore + * this method is faster than the actorFor above. + */ + def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = path match { + case ActorPathExtractor(address, elems) ⇒ + if (isSelfAddress(address)) actorFor(rootGuardian, elems) + else new RemoteActorRef(this, transport, localAddress, + new RootActorPath(address) / elems, Nobody, props = None, deploy = None) + case _ ⇒ local.actorFor(ref, path) + } + def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index d18abf7126..dddb74cede 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -239,7 +239,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re * Returns a newly created AkkaRemoteProtocol with the given message payload. */ def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = - AkkaRemoteProtocol.newBuilder.setMessage(rmp).build + AkkaRemoteProtocol.newBuilder.setPayload(rmp.toByteString).build /** * Returns a newly created AkkaRemoteProtocol with the given control payload. diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 7568f859ec..8411d312cc 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -12,7 +12,7 @@ import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, DefaultC import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } import org.jboss.netty.handler.execution.ExecutionHandler import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } -import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } +import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol } import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected } import akka.AkkaException import akka.event.Logging @@ -273,8 +273,8 @@ private[akka] class ActiveRemoteClientHandler( case CommandType.SHUTDOWN ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) } case _ ⇒ //Ignore others } - case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system)) + case arp: AkkaRemoteProtocol if arp.hasPayload ⇒ + client.netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(arp.getPayload), client.netty.system)) case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 15ca143bf8..dfa97f78ce 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -4,7 +4,7 @@ package akka.remote.netty import akka.actor.Address -import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } +import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol } import akka.remote._ import java.net.InetAddress import java.net.InetSocketAddress @@ -146,8 +146,8 @@ private[akka] class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { event.getMessage match { - case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system)) + case remote: AkkaRemoteProtocol if remote.hasPayload ⇒ + netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(remote.getPayload), netty.system)) case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction instruction.getCommandType match { diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 76364f2f60..39db000b6b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -7,6 +7,7 @@ import akka.remote.transport.AkkaPduCodec._ import akka.remote.{ RemoteActorRefProvider, RemoteProtocol } import akka.util.ByteString import com.google.protobuf.InvalidProtocolBufferException +import com.google.protobuf.{ ByteString ⇒ PByteString } class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause) @@ -20,10 +21,12 @@ private[remote] object AkkaPduCodec { case class Associate(cookie: Option[String], origin: Address) extends AkkaPdu case object Disassociate extends AkkaPdu case object Heartbeat extends AkkaPdu + case class Payload(bytes: ByteString) extends AkkaPdu + case class Message(recipient: InternalActorRef, recipientAddress: Address, serializedMessage: MessageProtocol, - sender: Option[ActorRef]) extends AkkaPdu + senderOption: Option[ActorRef]) } /** @@ -31,7 +34,9 @@ private[remote] object AkkaPduCodec { */ private[remote] trait AkkaPduCodec { - def constructMessagePdu( + def constructPayload(payload: ByteString): ByteString + + def constructMessage( localAddress: Address, recipient: ActorRef, serializedMessage: MessageProtocol, @@ -43,13 +48,15 @@ private[remote] trait AkkaPduCodec { def constructHeartbeat: ByteString - def decodePdu(raw: ByteString, provider: RemoteActorRefProvider): AkkaPdu // Effective enough? + def decodePdu(raw: ByteString): AkkaPdu + + def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Message } private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { - override def constructMessagePdu( + override def constructMessage( localAddress: Address, recipient: ActorRef, serializedMessage: MessageProtocol, @@ -61,9 +68,12 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { senderOption foreach { ref ⇒ messageBuilder.setSender(serializeActorRef(localAddress, ref)) } messageBuilder.setMessage(serializedMessage) - akkaRemoteProtocolToByteString(AkkaRemoteProtocol.newBuilder().setMessage(messageBuilder.build).build) + akkaMessageProtocolToByteString(messageBuilder.build) } + override def constructPayload(payload: ByteString): ByteString = akkaRemoteProtocolToByteString( + AkkaRemoteProtocol.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build) + override def constructAssociate(cookie: Option[String], origin: Address): ByteString = constructControlMessagePdu(RemoteProtocol.CommandType.CONNECT, cookie, Some(origin)) @@ -73,12 +83,12 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { override val constructHeartbeat: ByteString = constructControlMessagePdu(RemoteProtocol.CommandType.HEARTBEAT, None, None) - override def decodePdu(raw: ByteString, provider: RemoteActorRefProvider): AkkaPdu = { + override def decodePdu(raw: ByteString): AkkaPdu = { try { val pdu = AkkaRemoteProtocol.parseFrom(raw.toArray) - if (pdu.hasMessage) { - decodeMessage(pdu.getMessage, provider) + if (pdu.hasPayload) { + Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) } else if (pdu.hasInstruction) { decodeControlPdu(pdu.getInstruction) } else { @@ -89,12 +99,18 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } } - private def decodeMessage(msgPdu: RemoteMessageProtocol, provider: RemoteActorRefProvider): Message = { + override def decodeMessage( + raw: ByteString, + provider: RemoteActorRefProvider, + localAddress: Address): Message = { + val msgPdu = RemoteMessageProtocol.parseFrom(raw.toArray) Message( - recipient = provider.actorFor(provider.rootGuardian, msgPdu.getRecipient.getPath), + recipient = provider.actorForWithLocalAddress(provider.rootGuardian, msgPdu.getRecipient.getPath, localAddress), recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), serializedMessage = msgPdu.getMessage, - sender = if (msgPdu.hasSender) Some(provider.actorFor(provider.rootGuardian, msgPdu.getSender.getPath)) else None) + senderOption = (if (msgPdu.hasSender) + Some(provider.actorForWithLocalAddress(provider.rootGuardian, msgPdu.getSender.getPath, localAddress)) + else None)) } private def decodeControlPdu(controlPdu: RemoteControlProtocol): AkkaPdu = { @@ -128,6 +144,9 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { private def akkaRemoteProtocolToByteString(pdu: AkkaRemoteProtocol): ByteString = ByteString(pdu.toByteArray) + private def akkaMessageProtocolToByteString(message: RemoteMessageProtocol): ByteString = + ByteString(message.toByteArray) + private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { val fullActorRefString: String = if (ref.path.address.host.isDefined) ref.path.toString diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index d002792f0c..5aadb4f4fd 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -224,9 +224,7 @@ private[transport] class AkkaProtocolHandle( private val codec: AkkaPduCodec) extends AssociationHandle { - // FIXME: This is currently a hack! The caller should not know anything about the format of the Akka protocol - // but here it does. This is temporary and will be fixed. - override def write(payload: ByteString): Boolean = wrappedHandle.write(payload) + override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) override def disassociate(): Unit = stateActor ! DisassociateUnderlying @@ -296,9 +294,6 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat this(InboundUnassociated(associationHandler, wrappedHandle), localAddress, settings, codec, failureDetector) } - // FIXME: This may break with ClusterActorRefProvider if it does not extends RemoteActorRefProvider - val provider = context.system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] - initialData match { case d: OutboundUnassociated ⇒ d.transport.associate(removeScheme(d.remoteAddress)) pipeTo self @@ -375,10 +370,10 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat stop() // Any other activity is considered an implicit acknowledgement of the association - case Message(recipient, recipientAddress, serializedMessage, senderOption) ⇒ + case Payload(payload) ⇒ sendHeartbeat(wrappedHandle) goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue(p)) + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue(payload)) case Heartbeat ⇒ sendHeartbeat(wrappedHandle) @@ -405,9 +400,9 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Heartbeat ⇒ failureDetector.heartbeat(); stay() - case Message(recipient, recipientAddress, serializedMessage, senderOption) ⇒ + case Payload(payload) ⇒ // Queue message until handler is registered - stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ p) + stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ payload) case _ ⇒ stay() } @@ -419,8 +414,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Heartbeat ⇒ failureDetector.heartbeat(); stay() - case Message(recipient, recipientAddress, serializedMessage, senderOption) ⇒ - handler ! InboundPayload(p) + case Payload(payload) ⇒ + handler ! InboundPayload(payload) stay() case _ ⇒ stay() @@ -525,7 +520,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } private def decodePdu(pdu: ByteString): AkkaPdu = ape("Error while decoding incoming Akka PDU of length: " + pdu.length) { - codec.decodePdu(pdu, provider) + codec.decodePdu(pdu) } // Neither heartbeats neither disassociate cares about backing off if write fails: diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index a92f98bc6c..73de5041c5 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -68,10 +68,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val codec = AkkaPduProtobufCodec - val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] - val testMsg = RemoteProtocol.MessageProtocol.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build - val testMsgPdu: ByteString = codec.constructMessagePdu(localAkkaAddress, self, testMsg, None) + val testEnvelope = codec.constructMessage(localAkkaAddress, self, testMsg, None) + val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) def testHeartbeat = InboundPayload(codec.constructHeartbeat) def testPayload = InboundPayload(testMsgPdu) @@ -91,7 +90,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re def lastActivityIsHeartbeat(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload, provider) match { + codec.decodePdu(payload) match { case Heartbeat ⇒ true case _ ⇒ false } @@ -100,7 +99,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload, provider) match { + codec.decodePdu(payload) match { case Associate(c, origin) if c == cookie && origin == localAddress ⇒ true case _ ⇒ false } @@ -109,7 +108,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re def lastActivityIsDisassociate(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload, provider) match { + codec.decodePdu(payload) match { case Disassociate ⇒ true case _ ⇒ false } @@ -161,7 +160,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re reader ! testPayload expectMsgPF() { - case InboundPayload(p) ⇒ p must be === testMsgPdu + case InboundPayload(p) ⇒ p must be === testEnvelope } }