diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 98974a245b..b2b9cf121a 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -55,12 +55,12 @@ class CodecBenchmark { implicit val system = ActorSystem("CodecBenchmark", config) val systemB = ActorSystem("systemB", system.settings.config) - private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + private val envelopePool = new EnvelopeBufferPool(1024 * 1024, 128) private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) val headerIn = HeaderBuilder.in(NoInboundCompressions) - val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) + val envelopeTemplateBuffer = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.LITTLE_ENDIAN) val uniqueLocalAddress = UniqueAddress( system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 3521768011..9103899eb4 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -69,7 +69,7 @@ abstract class AeronStreamConsistencySpec r } - val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val pool = new EnvelopeBufferPool(1024 * 1024, 128) lazy implicit val mat = ActorMaterializer()(system) import system.dispatcher diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 240a1c1c2f..4b0fd67dd0 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -86,7 +86,7 @@ abstract class AeronStreamLatencySpec val driver = MediaDriver.launchEmbedded() - val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val pool = new EnvelopeBufferPool(1024 * 1024, 128) val stats = new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 27689d59d3..a1f9a7ee4d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -84,7 +84,7 @@ abstract class AeronStreamMaxThroughputSpec val driver = MediaDriver.launchEmbedded() - val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val pool = new EnvelopeBufferPool(1024 * 1024, 128) val stats = new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 896e48124d..c2251ed9f4 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -30,6 +30,7 @@ object LatencySpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (30) and repeatCount (3) akka.test.LatencySpec.totalMessagesFactor = 1.0 akka.test.LatencySpec.repeatCount = 1 + akka.test.LatencySpec.real-message = off akka { loglevel = ERROR # avoid TestEventListener @@ -86,18 +87,24 @@ object LatencySpec extends MultiNodeConfig { def receive = { case bytes: Array[Byte] ⇒ if (bytes.length != 0) { - if (count == 0) - startTime = System.nanoTime() if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") - reporter.onMessage(1, payloadSize) - count += 1 - val d = System.nanoTime() - sendTimes.get(count - 1) - histogram.recordValue(d) - if (count == totalMessages) { - printTotal(testName, bytes.length, histogram, System.nanoTime() - startTime) - context.stop(self) - } + receiveMessage(bytes.length) } + case _: TestMessage ⇒ + receiveMessage(payloadSize) + } + + def receiveMessage(size: Int): Unit = { + if (count == 0) + startTime = System.nanoTime() + reporter.onMessage(1, payloadSize) + count += 1 + val d = System.nanoTime() - sendTimes.get(count - 1) + histogram.recordValue(d) + if (count == totalMessages) { + printTotal(testName, size, histogram, System.nanoTime() - startTime) + context.stop(self) + } } def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long): Unit = { @@ -133,7 +140,8 @@ object LatencySpec extends MultiNodeConfig { testName: String, messageRate: Int, // msg/s payloadSize: Int, - repeat: Int) + repeat: Int, + realMessage: Boolean) } @@ -148,6 +156,7 @@ abstract class LatencySpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor") val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount") + val realMessage = system.settings.config.getBoolean("akka.test.LatencySpec.real-message") var plots = LatencyPlots() @@ -183,32 +192,38 @@ abstract class LatencySpec testName = "warmup", messageRate = 10000, payloadSize = 100, - repeat = repeatCount), + repeat = repeatCount, + realMessage), TestSettings( testName = "rate-100-size-100", messageRate = 100, payloadSize = 100, - repeat = repeatCount), + repeat = repeatCount, + realMessage), TestSettings( testName = "rate-1000-size-100", messageRate = 1000, payloadSize = 100, - repeat = repeatCount), + repeat = repeatCount, + realMessage), TestSettings( testName = "rate-10000-size-100", messageRate = 10000, payloadSize = 100, - repeat = repeatCount), + repeat = repeatCount, + realMessage), TestSettings( testName = "rate-20000-size-100", messageRate = 20000, payloadSize = 100, - repeat = repeatCount), + repeat = repeatCount, + realMessage), TestSettings( testName = "rate-1000-size-1k", messageRate = 1000, payloadSize = 1000, - repeat = repeatCount)) + repeat = repeatCount, + realMessage)) def test(testSettings: TestSettings): Unit = { import testSettings._ @@ -259,6 +274,17 @@ abstract class LatencySpec adjust = math.max(0L, (diff - targetDelay) / 2) } + val msg = + if (testSettings.realMessage) + TestMessage( + id = i, + name = "abc", + status = i % 2 == 0, + description = "ABC", + payload = payload, + items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B"))) + else payload + echo.tell(payload, receiver) i += 1 } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 335d3bedbe..0932066320 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -31,6 +31,7 @@ object MaxThroughputSpec extends MultiNodeConfig { ConfigFactory.parseString(s""" # for serious measurements you should increase the totalMessagesFactor (20) akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 + akka.test.MaxThroughputSpec.real-message = off akka { loglevel = INFO log-dead-letters = 1000000 @@ -44,9 +45,11 @@ object MaxThroughputSpec extends MultiNodeConfig { serializers { test = "akka.remote.artery.MaxThroughputSpec$$TestSerializer" + test-message = "akka.remote.artery.TestMessageSerializer" } serialization-bindings { "akka.remote.artery.MaxThroughputSpec$$FlowControl" = test + "akka.remote.artery.TestMessage" = test-message } } remote.artery { @@ -85,6 +88,9 @@ object MaxThroughputSpec extends MultiNodeConfig { if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") reporter.onMessage(1, payloadSize) c += 1 + case msg: TestMessage ⇒ + reporter.onMessage(1, payloadSize) + c += 1 case Start ⇒ c = 0 sender() ! Start @@ -194,8 +200,19 @@ object MaxThroughputSpec extends MultiNodeConfig { val batchSize = math.min(remaining, burstSize) var i = 0 while (i < batchSize) { - // target ! payload - target.tell(payload, ActorRef.noSender) + val msg = + if (realMessage) + TestMessage( + id = totalMessages - remaining + i, + name = "abc", + status = i % 2 == 0, + description = "ABC", + payload = payload, + items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B"))) + else payload + + // target ! msg + target.tell(msg, ActorRef.noSender) i += 1 } remaining -= batchSize @@ -214,7 +231,8 @@ object MaxThroughputSpec extends MultiNodeConfig { totalMessages: Long, burstSize: Int, payloadSize: Int, - senderReceiverPairs: Int) { + senderReceiverPairs: Int, + realMessage: Boolean) { // data based on measurement def totalSize(system: ActorSystem) = payloadSize + (if (RARP(system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) 38 else 110) } @@ -267,6 +285,7 @@ abstract class MaxThroughputSpec import MaxThroughputSpec._ val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") + val realMessage = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.real-message") var plot = PlotResult() @@ -302,31 +321,36 @@ abstract class MaxThroughputSpec totalMessages = adjustedTotalMessages(20000), burstSize = 1000, payloadSize = 100, - senderReceiverPairs = 1), + senderReceiverPairs = 1, + realMessage), TestSettings( testName = "1-to-1", totalMessages = adjustedTotalMessages(50000), burstSize = 1000, payloadSize = 100, - senderReceiverPairs = 1), + senderReceiverPairs = 1, + realMessage), TestSettings( testName = "1-to-1-size-1k", totalMessages = adjustedTotalMessages(20000), burstSize = 1000, payloadSize = 1000, - senderReceiverPairs = 1), + senderReceiverPairs = 1, + realMessage), TestSettings( testName = "1-to-1-size-10k", totalMessages = adjustedTotalMessages(10000), burstSize = 1000, payloadSize = 10000, - senderReceiverPairs = 1), + senderReceiverPairs = 1, + realMessage), TestSettings( testName = "5-to-5", totalMessages = adjustedTotalMessages(20000), burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000 payloadSize = 100, - senderReceiverPairs = 5)) + senderReceiverPairs = 5, + realMessage)) def test(testSettings: TestSettings): Unit = { import testSettings._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestMessage.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestMessage.scala new file mode 100644 index 0000000000..0379ea3799 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestMessage.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.ExtendedActorSystem +import akka.serialization.SerializerWithStringManifest +import akka.serialization.ByteBufferSerializer +import akka.remote.artery.protobuf.{ TestMessages ⇒ proto } +import akka.protobuf.ByteString +import java.util.concurrent.locks.LockSupport + +object TestMessage { + final case class Item(id: Long, name: String) +} + +final case class TestMessage( + id: Long, + name: String, + status: Boolean, + description: String, + payload: Array[Byte], + items: Vector[TestMessage.Item]) + +class TestMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest { + + val TestMessageManifest = "A" + + override val identifier: Int = 101 + + override def manifest(o: AnyRef): String = + o match { + case _: TestMessage ⇒ TestMessageManifest + } + + override def toBinary(o: AnyRef): Array[Byte] = o match { + case msg: TestMessage ⇒ + val builder = proto.TestMessage.newBuilder() + .setId(msg.id) + .setName(msg.name) + .setDescription(msg.description) + .setStatus(msg.status) + .setPayload(ByteString.copyFrom(msg.payload)) + msg.items.foreach { item ⇒ + builder.addItems(proto.Item.newBuilder().setId(item.id).setName(item.name)) + } + builder.build().toByteArray() + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + val protoMsg = proto.TestMessage.parseFrom(bytes) + import scala.collection.JavaConverters._ + val items = protoMsg.getItemsList.asScala.map { item ⇒ + TestMessage.Item(item.getId, item.getName) + }.toVector + + TestMessage( + id = protoMsg.getId, + name = protoMsg.getName, + description = protoMsg.getDescription, + status = protoMsg.getStatus, + payload = protoMsg.getPayload.toByteArray(), + items = items) + } +} diff --git a/akka-remote-tests/src/test/java/akka/remote/artery/protobuf/TestMessages.java b/akka-remote-tests/src/test/java/akka/remote/artery/protobuf/TestMessages.java new file mode 100644 index 0000000000..bde5d4fd81 --- /dev/null +++ b/akka-remote-tests/src/test/java/akka/remote/artery/protobuf/TestMessages.java @@ -0,0 +1,1903 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: TestMessages.proto + +package akka.remote.artery.protobuf; + +public final class TestMessages { + private TestMessages() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface TestMessageOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required uint64 id = 1; + /** + * required uint64 id = 1; + */ + boolean hasId(); + /** + * required uint64 id = 1; + */ + long getId(); + + // required string name = 2; + /** + * required string name = 2; + */ + boolean hasName(); + /** + * required string name = 2; + */ + java.lang.String getName(); + /** + * required string name = 2; + */ + akka.protobuf.ByteString + getNameBytes(); + + // required bool status = 3; + /** + * required bool status = 3; + */ + boolean hasStatus(); + /** + * required bool status = 3; + */ + boolean getStatus(); + + // optional string description = 4; + /** + * optional string description = 4; + */ + boolean hasDescription(); + /** + * optional string description = 4; + */ + java.lang.String getDescription(); + /** + * optional string description = 4; + */ + akka.protobuf.ByteString + getDescriptionBytes(); + + // optional bytes payload = 5; + /** + * optional bytes payload = 5; + */ + boolean hasPayload(); + /** + * optional bytes payload = 5; + */ + akka.protobuf.ByteString getPayload(); + + // repeated .Item items = 6; + /** + * repeated .Item items = 6; + */ + java.util.List + getItemsList(); + /** + * repeated .Item items = 6; + */ + akka.remote.artery.protobuf.TestMessages.Item getItems(int index); + /** + * repeated .Item items = 6; + */ + int getItemsCount(); + /** + * repeated .Item items = 6; + */ + java.util.List + getItemsOrBuilderList(); + /** + * repeated .Item items = 6; + */ + akka.remote.artery.protobuf.TestMessages.ItemOrBuilder getItemsOrBuilder( + int index); + } + /** + * Protobuf type {@code TestMessage} + */ + public static final class TestMessage extends + akka.protobuf.GeneratedMessage + implements TestMessageOrBuilder { + // Use TestMessage.newBuilder() to construct. + private TestMessage(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private TestMessage(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final TestMessage defaultInstance; + public static TestMessage getDefaultInstance() { + return defaultInstance; + } + + public TestMessage getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private TestMessage( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + id_ = input.readUInt64(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + name_ = input.readBytes(); + break; + } + case 24: { + bitField0_ |= 0x00000004; + status_ = input.readBool(); + break; + } + case 34: { + bitField0_ |= 0x00000008; + description_ = input.readBytes(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + payload_ = input.readBytes(); + break; + } + case 50: { + if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + items_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000020; + } + items_.add(input.readMessage(akka.remote.artery.protobuf.TestMessages.Item.PARSER, extensionRegistry)); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { + items_ = java.util.Collections.unmodifiableList(items_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.artery.protobuf.TestMessages.internal_static_TestMessage_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.artery.protobuf.TestMessages.internal_static_TestMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.artery.protobuf.TestMessages.TestMessage.class, akka.remote.artery.protobuf.TestMessages.TestMessage.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public TestMessage parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new TestMessage(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private long id_; + /** + * required uint64 id = 1; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint64 id = 1; + */ + public long getId() { + return id_; + } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private java.lang.Object name_; + /** + * required string name = 2; + */ + public boolean hasName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + name_ = s; + } + return s; + } + } + /** + * required string name = 2; + */ + public akka.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // required bool status = 3; + public static final int STATUS_FIELD_NUMBER = 3; + private boolean status_; + /** + * required bool status = 3; + */ + public boolean hasStatus() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required bool status = 3; + */ + public boolean getStatus() { + return status_; + } + + // optional string description = 4; + public static final int DESCRIPTION_FIELD_NUMBER = 4; + private java.lang.Object description_; + /** + * optional string description = 4; + */ + public boolean hasDescription() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string description = 4; + */ + public java.lang.String getDescription() { + java.lang.Object ref = description_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + description_ = s; + } + return s; + } + } + /** + * optional string description = 4; + */ + public akka.protobuf.ByteString + getDescriptionBytes() { + java.lang.Object ref = description_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + description_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // optional bytes payload = 5; + public static final int PAYLOAD_FIELD_NUMBER = 5; + private akka.protobuf.ByteString payload_; + /** + * optional bytes payload = 5; + */ + public boolean hasPayload() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes payload = 5; + */ + public akka.protobuf.ByteString getPayload() { + return payload_; + } + + // repeated .Item items = 6; + public static final int ITEMS_FIELD_NUMBER = 6; + private java.util.List items_; + /** + * repeated .Item items = 6; + */ + public java.util.List getItemsList() { + return items_; + } + /** + * repeated .Item items = 6; + */ + public java.util.List + getItemsOrBuilderList() { + return items_; + } + /** + * repeated .Item items = 6; + */ + public int getItemsCount() { + return items_.size(); + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.Item getItems(int index) { + return items_.get(index); + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.ItemOrBuilder getItemsOrBuilder( + int index) { + return items_.get(index); + } + + private void initFields() { + id_ = 0L; + name_ = ""; + status_ = false; + description_ = ""; + payload_ = akka.protobuf.ByteString.EMPTY; + items_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasStatus()) { + memoizedIsInitialized = 0; + return false; + } + for (int i = 0; i < getItemsCount(); i++) { + if (!getItems(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, id_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getNameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(3, status_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, getDescriptionBytes()); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, payload_); + } + for (int i = 0; i < items_.size(); i++) { + output.writeMessage(6, items_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeUInt64Size(1, id_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(2, getNameBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeBoolSize(3, status_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(4, getDescriptionBytes()); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(5, payload_); + } + for (int i = 0; i < items_.size(); i++) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(6, items_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.TestMessage parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.remote.artery.protobuf.TestMessages.TestMessage prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code TestMessage} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.remote.artery.protobuf.TestMessages.TestMessageOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.artery.protobuf.TestMessages.internal_static_TestMessage_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.artery.protobuf.TestMessages.internal_static_TestMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.artery.protobuf.TestMessages.TestMessage.class, akka.remote.artery.protobuf.TestMessages.TestMessage.Builder.class); + } + + // Construct using akka.remote.artery.protobuf.TestMessages.TestMessage.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getItemsFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + id_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + name_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + status_ = false; + bitField0_ = (bitField0_ & ~0x00000004); + description_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); + payload_ = akka.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + if (itemsBuilder_ == null) { + items_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000020); + } else { + itemsBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.remote.artery.protobuf.TestMessages.internal_static_TestMessage_descriptor; + } + + public akka.remote.artery.protobuf.TestMessages.TestMessage getDefaultInstanceForType() { + return akka.remote.artery.protobuf.TestMessages.TestMessage.getDefaultInstance(); + } + + public akka.remote.artery.protobuf.TestMessages.TestMessage build() { + akka.remote.artery.protobuf.TestMessages.TestMessage result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.remote.artery.protobuf.TestMessages.TestMessage buildPartial() { + akka.remote.artery.protobuf.TestMessages.TestMessage result = new akka.remote.artery.protobuf.TestMessages.TestMessage(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.id_ = id_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.name_ = name_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.status_ = status_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.description_ = description_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.payload_ = payload_; + if (itemsBuilder_ == null) { + if (((bitField0_ & 0x00000020) == 0x00000020)) { + items_ = java.util.Collections.unmodifiableList(items_); + bitField0_ = (bitField0_ & ~0x00000020); + } + result.items_ = items_; + } else { + result.items_ = itemsBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.remote.artery.protobuf.TestMessages.TestMessage) { + return mergeFrom((akka.remote.artery.protobuf.TestMessages.TestMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.remote.artery.protobuf.TestMessages.TestMessage other) { + if (other == akka.remote.artery.protobuf.TestMessages.TestMessage.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + bitField0_ |= 0x00000002; + name_ = other.name_; + onChanged(); + } + if (other.hasStatus()) { + setStatus(other.getStatus()); + } + if (other.hasDescription()) { + bitField0_ |= 0x00000008; + description_ = other.description_; + onChanged(); + } + if (other.hasPayload()) { + setPayload(other.getPayload()); + } + if (itemsBuilder_ == null) { + if (!other.items_.isEmpty()) { + if (items_.isEmpty()) { + items_ = other.items_; + bitField0_ = (bitField0_ & ~0x00000020); + } else { + ensureItemsIsMutable(); + items_.addAll(other.items_); + } + onChanged(); + } + } else { + if (!other.items_.isEmpty()) { + if (itemsBuilder_.isEmpty()) { + itemsBuilder_.dispose(); + itemsBuilder_ = null; + items_ = other.items_; + bitField0_ = (bitField0_ & ~0x00000020); + itemsBuilder_ = + akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getItemsFieldBuilder() : null; + } else { + itemsBuilder_.addAllMessages(other.items_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasId()) { + + return false; + } + if (!hasName()) { + + return false; + } + if (!hasStatus()) { + + return false; + } + for (int i = 0; i < getItemsCount(); i++) { + if (!getItems(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.remote.artery.protobuf.TestMessages.TestMessage parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.remote.artery.protobuf.TestMessages.TestMessage) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required uint64 id = 1; + private long id_ ; + /** + * required uint64 id = 1; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint64 id = 1; + */ + public long getId() { + return id_; + } + /** + * required uint64 id = 1; + */ + public Builder setId(long value) { + bitField0_ |= 0x00000001; + id_ = value; + onChanged(); + return this; + } + /** + * required uint64 id = 1; + */ + public Builder clearId() { + bitField0_ = (bitField0_ & ~0x00000001); + id_ = 0L; + onChanged(); + return this; + } + + // required string name = 2; + private java.lang.Object name_ = ""; + /** + * required string name = 2; + */ + public boolean hasName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string name = 2; + */ + public akka.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string name = 2; + */ + public Builder setName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + name_ = value; + onChanged(); + return this; + } + /** + * required string name = 2; + */ + public Builder clearName() { + bitField0_ = (bitField0_ & ~0x00000002); + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + /** + * required string name = 2; + */ + public Builder setNameBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + name_ = value; + onChanged(); + return this; + } + + // required bool status = 3; + private boolean status_ ; + /** + * required bool status = 3; + */ + public boolean hasStatus() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required bool status = 3; + */ + public boolean getStatus() { + return status_; + } + /** + * required bool status = 3; + */ + public Builder setStatus(boolean value) { + bitField0_ |= 0x00000004; + status_ = value; + onChanged(); + return this; + } + /** + * required bool status = 3; + */ + public Builder clearStatus() { + bitField0_ = (bitField0_ & ~0x00000004); + status_ = false; + onChanged(); + return this; + } + + // optional string description = 4; + private java.lang.Object description_ = ""; + /** + * optional string description = 4; + */ + public boolean hasDescription() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string description = 4; + */ + public java.lang.String getDescription() { + java.lang.Object ref = description_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + description_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string description = 4; + */ + public akka.protobuf.ByteString + getDescriptionBytes() { + java.lang.Object ref = description_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + description_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * optional string description = 4; + */ + public Builder setDescription( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + description_ = value; + onChanged(); + return this; + } + /** + * optional string description = 4; + */ + public Builder clearDescription() { + bitField0_ = (bitField0_ & ~0x00000008); + description_ = getDefaultInstance().getDescription(); + onChanged(); + return this; + } + /** + * optional string description = 4; + */ + public Builder setDescriptionBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + description_ = value; + onChanged(); + return this; + } + + // optional bytes payload = 5; + private akka.protobuf.ByteString payload_ = akka.protobuf.ByteString.EMPTY; + /** + * optional bytes payload = 5; + */ + public boolean hasPayload() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes payload = 5; + */ + public akka.protobuf.ByteString getPayload() { + return payload_; + } + /** + * optional bytes payload = 5; + */ + public Builder setPayload(akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + payload_ = value; + onChanged(); + return this; + } + /** + * optional bytes payload = 5; + */ + public Builder clearPayload() { + bitField0_ = (bitField0_ & ~0x00000010); + payload_ = getDefaultInstance().getPayload(); + onChanged(); + return this; + } + + // repeated .Item items = 6; + private java.util.List items_ = + java.util.Collections.emptyList(); + private void ensureItemsIsMutable() { + if (!((bitField0_ & 0x00000020) == 0x00000020)) { + items_ = new java.util.ArrayList(items_); + bitField0_ |= 0x00000020; + } + } + + private akka.protobuf.RepeatedFieldBuilder< + akka.remote.artery.protobuf.TestMessages.Item, akka.remote.artery.protobuf.TestMessages.Item.Builder, akka.remote.artery.protobuf.TestMessages.ItemOrBuilder> itemsBuilder_; + + /** + * repeated .Item items = 6; + */ + public java.util.List getItemsList() { + if (itemsBuilder_ == null) { + return java.util.Collections.unmodifiableList(items_); + } else { + return itemsBuilder_.getMessageList(); + } + } + /** + * repeated .Item items = 6; + */ + public int getItemsCount() { + if (itemsBuilder_ == null) { + return items_.size(); + } else { + return itemsBuilder_.getCount(); + } + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.Item getItems(int index) { + if (itemsBuilder_ == null) { + return items_.get(index); + } else { + return itemsBuilder_.getMessage(index); + } + } + /** + * repeated .Item items = 6; + */ + public Builder setItems( + int index, akka.remote.artery.protobuf.TestMessages.Item value) { + if (itemsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureItemsIsMutable(); + items_.set(index, value); + onChanged(); + } else { + itemsBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder setItems( + int index, akka.remote.artery.protobuf.TestMessages.Item.Builder builderForValue) { + if (itemsBuilder_ == null) { + ensureItemsIsMutable(); + items_.set(index, builderForValue.build()); + onChanged(); + } else { + itemsBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder addItems(akka.remote.artery.protobuf.TestMessages.Item value) { + if (itemsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureItemsIsMutable(); + items_.add(value); + onChanged(); + } else { + itemsBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder addItems( + int index, akka.remote.artery.protobuf.TestMessages.Item value) { + if (itemsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureItemsIsMutable(); + items_.add(index, value); + onChanged(); + } else { + itemsBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder addItems( + akka.remote.artery.protobuf.TestMessages.Item.Builder builderForValue) { + if (itemsBuilder_ == null) { + ensureItemsIsMutable(); + items_.add(builderForValue.build()); + onChanged(); + } else { + itemsBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder addItems( + int index, akka.remote.artery.protobuf.TestMessages.Item.Builder builderForValue) { + if (itemsBuilder_ == null) { + ensureItemsIsMutable(); + items_.add(index, builderForValue.build()); + onChanged(); + } else { + itemsBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder addAllItems( + java.lang.Iterable values) { + if (itemsBuilder_ == null) { + ensureItemsIsMutable(); + super.addAll(values, items_); + onChanged(); + } else { + itemsBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder clearItems() { + if (itemsBuilder_ == null) { + items_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000020); + onChanged(); + } else { + itemsBuilder_.clear(); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public Builder removeItems(int index) { + if (itemsBuilder_ == null) { + ensureItemsIsMutable(); + items_.remove(index); + onChanged(); + } else { + itemsBuilder_.remove(index); + } + return this; + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.Item.Builder getItemsBuilder( + int index) { + return getItemsFieldBuilder().getBuilder(index); + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.ItemOrBuilder getItemsOrBuilder( + int index) { + if (itemsBuilder_ == null) { + return items_.get(index); } else { + return itemsBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .Item items = 6; + */ + public java.util.List + getItemsOrBuilderList() { + if (itemsBuilder_ != null) { + return itemsBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(items_); + } + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.Item.Builder addItemsBuilder() { + return getItemsFieldBuilder().addBuilder( + akka.remote.artery.protobuf.TestMessages.Item.getDefaultInstance()); + } + /** + * repeated .Item items = 6; + */ + public akka.remote.artery.protobuf.TestMessages.Item.Builder addItemsBuilder( + int index) { + return getItemsFieldBuilder().addBuilder( + index, akka.remote.artery.protobuf.TestMessages.Item.getDefaultInstance()); + } + /** + * repeated .Item items = 6; + */ + public java.util.List + getItemsBuilderList() { + return getItemsFieldBuilder().getBuilderList(); + } + private akka.protobuf.RepeatedFieldBuilder< + akka.remote.artery.protobuf.TestMessages.Item, akka.remote.artery.protobuf.TestMessages.Item.Builder, akka.remote.artery.protobuf.TestMessages.ItemOrBuilder> + getItemsFieldBuilder() { + if (itemsBuilder_ == null) { + itemsBuilder_ = new akka.protobuf.RepeatedFieldBuilder< + akka.remote.artery.protobuf.TestMessages.Item, akka.remote.artery.protobuf.TestMessages.Item.Builder, akka.remote.artery.protobuf.TestMessages.ItemOrBuilder>( + items_, + ((bitField0_ & 0x00000020) == 0x00000020), + getParentForChildren(), + isClean()); + items_ = null; + } + return itemsBuilder_; + } + + // @@protoc_insertion_point(builder_scope:TestMessage) + } + + static { + defaultInstance = new TestMessage(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TestMessage) + } + + public interface ItemOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required uint64 id = 1; + /** + * required uint64 id = 1; + */ + boolean hasId(); + /** + * required uint64 id = 1; + */ + long getId(); + + // required string name = 2; + /** + * required string name = 2; + */ + boolean hasName(); + /** + * required string name = 2; + */ + java.lang.String getName(); + /** + * required string name = 2; + */ + akka.protobuf.ByteString + getNameBytes(); + } + /** + * Protobuf type {@code Item} + */ + public static final class Item extends + akka.protobuf.GeneratedMessage + implements ItemOrBuilder { + // Use Item.newBuilder() to construct. + private Item(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Item(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Item defaultInstance; + public static Item getDefaultInstance() { + return defaultInstance; + } + + public Item getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Item( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + id_ = input.readUInt64(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + name_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.artery.protobuf.TestMessages.internal_static_Item_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.artery.protobuf.TestMessages.internal_static_Item_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.artery.protobuf.TestMessages.Item.class, akka.remote.artery.protobuf.TestMessages.Item.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public Item parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new Item(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private long id_; + /** + * required uint64 id = 1; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint64 id = 1; + */ + public long getId() { + return id_; + } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private java.lang.Object name_; + /** + * required string name = 2; + */ + public boolean hasName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + name_ = s; + } + return s; + } + } + /** + * required string name = 2; + */ + public akka.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + private void initFields() { + id_ = 0L; + name_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasName()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, id_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getNameBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeUInt64Size(1, id_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(2, getNameBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.artery.protobuf.TestMessages.Item parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.remote.artery.protobuf.TestMessages.Item prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code Item} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.remote.artery.protobuf.TestMessages.ItemOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.artery.protobuf.TestMessages.internal_static_Item_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.artery.protobuf.TestMessages.internal_static_Item_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.artery.protobuf.TestMessages.Item.class, akka.remote.artery.protobuf.TestMessages.Item.Builder.class); + } + + // Construct using akka.remote.artery.protobuf.TestMessages.Item.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + id_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + name_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.remote.artery.protobuf.TestMessages.internal_static_Item_descriptor; + } + + public akka.remote.artery.protobuf.TestMessages.Item getDefaultInstanceForType() { + return akka.remote.artery.protobuf.TestMessages.Item.getDefaultInstance(); + } + + public akka.remote.artery.protobuf.TestMessages.Item build() { + akka.remote.artery.protobuf.TestMessages.Item result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.remote.artery.protobuf.TestMessages.Item buildPartial() { + akka.remote.artery.protobuf.TestMessages.Item result = new akka.remote.artery.protobuf.TestMessages.Item(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.id_ = id_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.name_ = name_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.remote.artery.protobuf.TestMessages.Item) { + return mergeFrom((akka.remote.artery.protobuf.TestMessages.Item)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.remote.artery.protobuf.TestMessages.Item other) { + if (other == akka.remote.artery.protobuf.TestMessages.Item.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + bitField0_ |= 0x00000002; + name_ = other.name_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasId()) { + + return false; + } + if (!hasName()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.remote.artery.protobuf.TestMessages.Item parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.remote.artery.protobuf.TestMessages.Item) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required uint64 id = 1; + private long id_ ; + /** + * required uint64 id = 1; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint64 id = 1; + */ + public long getId() { + return id_; + } + /** + * required uint64 id = 1; + */ + public Builder setId(long value) { + bitField0_ |= 0x00000001; + id_ = value; + onChanged(); + return this; + } + /** + * required uint64 id = 1; + */ + public Builder clearId() { + bitField0_ = (bitField0_ & ~0x00000001); + id_ = 0L; + onChanged(); + return this; + } + + // required string name = 2; + private java.lang.Object name_ = ""; + /** + * required string name = 2; + */ + public boolean hasName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string name = 2; + */ + public akka.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string name = 2; + */ + public Builder setName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + name_ = value; + onChanged(); + return this; + } + /** + * required string name = 2; + */ + public Builder clearName() { + bitField0_ = (bitField0_ & ~0x00000002); + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + /** + * required string name = 2; + */ + public Builder setNameBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + name_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:Item) + } + + static { + defaultInstance = new Item(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:Item) + } + + private static akka.protobuf.Descriptors.Descriptor + internal_static_TestMessage_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TestMessage_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_Item_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_Item_fieldAccessorTable; + + public static akka.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static akka.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\022TestMessages.proto\"s\n\013TestMessage\022\n\n\002i" + + "d\030\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010\022\023\n" + + "\013description\030\004 \001(\t\022\017\n\007payload\030\005 \001(\014\022\024\n\005i" + + "tems\030\006 \003(\0132\005.Item\" \n\004Item\022\n\n\002id\030\001 \002(\004\022\014\n" + + "\004name\030\002 \002(\tB\035\n\033akka.remote.artery.protob" + + "uf" + }; + akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public akka.protobuf.ExtensionRegistry assignDescriptors( + akka.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_TestMessage_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_TestMessage_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TestMessage_descriptor, + new java.lang.String[] { "Id", "Name", "Status", "Description", "Payload", "Items", }); + internal_static_Item_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_Item_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_Item_descriptor, + new java.lang.String[] { "Id", "Name", }); + return null; + } + }; + akka.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new akka.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-remote-tests/src/test/protobuf/TestMessages.proto b/akka-remote-tests/src/test/protobuf/TestMessages.proto new file mode 100644 index 0000000000..6715a0c0a0 --- /dev/null +++ b/akka-remote-tests/src/test/protobuf/TestMessages.proto @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ + +option java_package = "akka.remote.artery.protobuf"; + +message TestMessage { + required uint64 id = 1; + required string name = 2; + required bool status = 3; + optional string description = 4; + optional bytes payload = 5; + repeated Item items = 6; +} + +message Item { + required uint64 id = 1; + required string name = 2; +} + diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index bf0fbd141c..99a1116e6a 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -152,6 +152,18 @@ akka { # Level 1 strongly prefer low CPU consumption over low latency. # Level 10 strongly prefer low latency over low CPU consumption. idle-cpu-level = 5 + + # Number of outbound lanes for each outbound association. A value greater than 1 + # means that serialization can be performed in parallel for different destination + # actors. The selection of lane is based on consistent hashing of the recipient + # ActorRef to preserve message ordering per receiver. + outbound-lanes = 1 + + # Total number of inbound lanes, shared among all inbound associations. A value + # greater than 1 means that deserialization can be performed in parallel for + # different destination actors. The selection of lane is based on consistent + # hashing of the recipient ActorRef to preserve message ordering per receiver. + inbound-lanes = 1 # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is @@ -161,6 +173,9 @@ akka { # unacknowledged system messages are re-delivered with this interval system-message-resend-interval = 1 second + # The timeout for outbound associations to perform the handshake. + handshake-timeout = 15 s + # incomplete handshake attempt is retried with this interval handshake-retry-interval = 1 second diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 8f1735cdda..914fc409b2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -455,14 +455,6 @@ private[akka] trait RemoteRef extends ActorRefScope { final def isLocal = false } -/** - * INTERNAL API - */ -private[remote] sealed abstract class MessageDestinationFlag -private[remote] case object RegularDestination extends MessageDestinationFlag -private[remote] case object LargeDestination extends MessageDestinationFlag -private[remote] case object PriorityDestination extends MessageDestinationFlag - /** * INTERNAL API * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. @@ -488,7 +480,7 @@ private[akka] class RemoteActorRef private[akka] ( @volatile private[remote] var cachedAssociation: artery.Association = null // used by artery to direct messages to separate specialized streams - @volatile private[remote] var cachedMessageDestinationFlag: MessageDestinationFlag = null + @volatile private[remote] var cachedSendQueueIndex: Int = -1 def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index e31a23ad03..365f71daf8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -51,25 +51,43 @@ private[akka] final class ArterySettings private (config: Config) { val DeleteAeronDirectory = getBoolean("delete-aeron-dir") val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level ⇒ 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") + val OutboundLanes = getInt("outbound-lanes").requiring(n ⇒ + n > 0, "outbound-lanes must be greater than zero") + val InboundLanes = getInt("inbound-lanes").requiring(n ⇒ + n > 0, "inbound-lanes must be greater than zero") val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring( _ > 0, "system-message-buffer-size must be more than zero") val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ - interval > 0.seconds, "system-message-resend-interval must be more than zero") + interval > Duration.Zero, "system-message-resend-interval must be more than zero") + val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval ⇒ + interval > Duration.Zero, "handshake-timeout must be more than zero") val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒ - interval > 0.seconds, "handshake-retry-interval must be more than zero") + interval > Duration.Zero, "handshake-retry-interval must be more than zero") val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ - interval > 0.seconds, "inject-handshake-interval must be more than zero") - val GiveUpSendAfter = config.getMillisDuration("give-up-send-after") - val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout") - val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout") + interval > Duration.Zero, "inject-handshake-interval must be more than zero") + val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-send-after must be more than zero") + val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ + interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") + val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ + interval > Duration.Zero, "inbound-restart-timeout must be more than zero") val InboundMaxRestarts = getInt("inbound-max-restarts") - val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout") + val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ + interval > Duration.Zero, "outbound-restart-timeout must be more than zero") val OutboundMaxRestarts = getInt("outbound-max-restarts") - val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout") - val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout") - val DriverTimeout = config.getMillisDuration("driver-timeout") + val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒ + interval > Duration.Zero, "client-liveness-timeout must be more than zero") + val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒ + interval > Duration.Zero, "image-liveness-timeout must be more than zero") + val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval ⇒ + interval > Duration.Zero, "driver-timeout must be more than zero") val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled") val Compression = new Compression(getConfig("compression")) + + final val MaximumFrameSize = 1024 * 1024 + final val MaximumPooledBuffers = 128 + final val MaximumLargeFrameSize = MaximumFrameSize * 5 + final val InboundBroadcastHubBufferSize = MaximumPooledBuffers / 2 } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 16f82393c2..40bbfd709f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -68,6 +68,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil import org.agrona.concurrent.BackoffIdleStrategy +import akka.stream.scaladsl.BroadcastHub /** * INTERNAL API @@ -293,10 +294,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList - private val handshakeTimeout: FiniteDuration = - system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring( - _ > Duration.Zero, - "handshake-timeout must be > 0") + private val inboundLanes = settings.Advanced.InboundLanes private val remoteDispatcher = system.dispatchers.lookup(settings.Dispatcher) @@ -317,15 +315,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" private val controlStreamId = 1 - private val ordinaryStreamId = 3 - private val largeStreamId = 4 + private val ordinaryStreamId = 2 + private val largeStreamId = 3 private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel) private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout) - private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) - private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) + private val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.MaximumPooledBuffers) + private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers) private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) // FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity @@ -528,22 +526,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundControlStream(compression: InboundCompressions): Unit = { - val (ctrl, completed) = - if (settings.Advanced.TestMode) { - val (mgmt, (ctrl, completed)) = - aeronSource(controlStreamId, envelopePool) - .via(inboundFlow(compression)) - .viaMat(inboundTestFlow)(Keep.right) - .toMat(inboundControlSink)(Keep.both) - .run()(materializer) - testStages.add(mgmt) - (ctrl, completed) - } else { - aeronSource(controlStreamId, envelopePool) - .via(inboundFlow(compression)) - .toMat(inboundControlSink)(Keep.right) - .run()(materializer) - } + val (testMgmt, ctrl, completed) = + aeronSource(controlStreamId, envelopeBufferPool) + .via(inboundFlow(compression)) + .toMat(inboundControlSink)(Keep.right) + .run()(materializer) + + if (settings.Advanced.TestMode) + testStages.add(testMgmt) controlSubject = ctrl @@ -604,19 +594,54 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundOrdinaryMessagesStream(compression: InboundCompressions): Unit = { val completed = - if (settings.Advanced.TestMode) { - val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool) + if (inboundLanes == 1) { + val (testMgmt, completed) = aeronSource(ordinaryStreamId, envelopeBufferPool) .via(inboundFlow(compression)) - .viaMat(inboundTestFlow)(Keep.right) - .toMat(inboundSink)(Keep.both) + .toMat(inboundSink(envelopeBufferPool))(Keep.right) .run()(materializer) - testStages.add(mgmt) - c + + if (settings.Advanced.TestMode) + testStages.add(testMgmt) + + completed + } else { - aeronSource(ordinaryStreamId, envelopePool) + val source = aeronSource(ordinaryStreamId, envelopeBufferPool) .via(inboundFlow(compression)) - .toMat(inboundSink)(Keep.right) - .run()(materializer) + .map(env ⇒ (env.recipient, env)) + + val broadcastHub = source.runWith(BroadcastHub.sink(bufferSize = settings.Advanced.InboundBroadcastHubBufferSize))(materializer) + + val lane = inboundSink(envelopeBufferPool) + + // select lane based on destination, to preserve message order + val partitionFun: OptionVal[ActorRef] ⇒ Int = { + _ match { + case OptionVal.Some(r) ⇒ math.abs(r.path.uid) % inboundLanes + case OptionVal.None ⇒ 0 + } + } + + val values: Vector[(TestManagementApi, Future[Done])] = + (0 until inboundLanes).map { i ⇒ + broadcastHub.runWith( + // TODO replace filter with "PartitionHub" when that is implemented + // must use a tuple here because envelope is pooled and must only be touched in the selected lane + Flow[(OptionVal[ActorRef], InboundEnvelope)].collect { + case (recipient, env) if partitionFun(recipient) == i ⇒ env + } + .toMat(lane)(Keep.right))(materializer) + }(collection.breakOut) + + val (testMgmtValues, completedValues) = values.unzip + + if (settings.Advanced.TestMode) + testMgmtValues.foreach(testStages.add) + + import system.dispatcher + val completed = Future.sequence(completedValues).map(_ ⇒ Done) + + completed } attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream(compression)) @@ -625,21 +650,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def runInboundLargeMessagesStream(): Unit = { val disableCompression = NoInboundCompressions // no compression on large message stream for now - val completed = - if (settings.Advanced.TestMode) { - val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow(disableCompression)) - .viaMat(inboundTestFlow)(Keep.right) - .toMat(inboundSink)(Keep.both) - .run()(materializer) - testStages.add(mgmt) - c - } else { - aeronSource(largeStreamId, largeEnvelopePool) - .via(inboundLargeFlow(disableCompression)) - .toMat(inboundSink)(Keep.right) - .run()(materializer) - } + val (testMgmt, completed) = aeronSource(largeStreamId, largeEnvelopeBufferPool) + .via(inboundLargeFlow(disableCompression)) + .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.right) + .run()(materializer) + + if (settings.Advanced.TestMode) + testStages.add(testMgmt) attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) } @@ -753,44 +770,59 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = - createOutboundSink(ordinaryStreamId, outboundContext, envelopePool) - def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(largeStreamId, outboundContext, largeEnvelopePool) + createOutboundSink(largeStreamId, outboundContext, largeEnvelopeBufferPool) .mapMaterializedValue { case (_, d) ⇒ d } + def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = + createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool) + private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { - Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) - .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, - settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) - .viaMat(createEncoder(bufferPool))(Keep.right) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopePool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))(Keep.both) + outboundLane(outboundContext, bufferPool) + .toMat(aeronSink(outboundContext, streamId))(Keep.both) } - /** - * The outbound stream is defined as two parts to be able to add test stage in-between. - * System messages must not be dropped before the SystemMessageDelivery stage. - */ - def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = { + def aeronSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] = + aeronSink(outboundContext, ordinaryStreamId) + + private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = { + Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, + envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink())) + } + + def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = + outboundLane(outboundContext, envelopeBufferPool) + + private def outboundLane( + outboundContext: OutboundContext, + bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { + Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) - .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, + settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) + .viaMat(createEncoder(bufferPool))(Keep.right) + } + + def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (TestManagementApi, OutboundControlIngress, Future[Done])] = { + + Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval, settings.Advanced.SysMsgBufferSize)) - - // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages - } - - def outboundControlPart2(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { - Flow[OutboundEnvelope] - .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) - .via(createEncoder(envelopePool)) + // note that System messages must not be dropped before the SystemMessageDelivery stage + .viaMat(outboundTestFlow(outboundContext))(Keep.right) + .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.both) + .via(createEncoder(envelopeBufferPool)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, - envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) + envelopeBufferPool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) + .mapMaterializedValue { + case ((a, b), c) ⇒ (a, b, c) + } + + // TODO we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = @@ -819,34 +851,41 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R inboundEnvelopePool)) } - def decoder(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = - createDecoder(compression, envelopePool) + def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = + Flow.fromGraph(new Deserializer(this, system, bufferPool)) - def inboundSink: Sink[InboundEnvelope, Future[Done]] = + def inboundSink(bufferPool: EnvelopeBufferPool): Sink[InboundEnvelope, (TestManagementApi, Future[Done])] = Flow[InboundEnvelope] + .via(createDeserializer(bufferPool)) + .viaMat(new InboundTestStage(this, settings.Advanced.TestMode))(Keep.right) .via(new InboundHandshake(this, inControlStream = false)) .via(new InboundQuarantineCheck(this)) - .toMat(messageDispatcherSink)(Keep.right) + .toMat(messageDispatcherSink)(Keep.both) def inboundFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) - .via(decoder(compression)) + .via(createDecoder(compression, envelopeBufferPool)) } def inboundLargeFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) - .via(createDecoder(compression, largeEnvelopePool)) + .via(createDecoder(compression, largeEnvelopeBufferPool)) } - def inboundControlSink: Sink[InboundEnvelope, (ControlMessageSubject, Future[Done])] = { + def inboundControlSink: Sink[InboundEnvelope, (TestManagementApi, ControlMessageSubject, Future[Done])] = { Flow[InboundEnvelope] + .via(createDeserializer(envelopeBufferPool)) + .viaMat(new InboundTestStage(this, settings.Advanced.TestMode))(Keep.right) .via(new InboundHandshake(this, inControlStream = true)) .via(new InboundQuarantineCheck(this)) - .viaMat(new InboundControlJunction)(Keep.right) + .viaMat(new InboundControlJunction)(Keep.both) .via(new SystemMessageAcker(this)) .toMat(messageDispatcherSink)(Keep.both) + .mapMaterializedValue { + case ((a, b), c) ⇒ (a, b, c) + } } private def initializeFlightRecorder(): Option[(FileChannel, File, FlightRecorder)] = { @@ -861,11 +900,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R None } - def inboundTestFlow: Flow[InboundEnvelope, InboundEnvelope, TestManagementApi] = - Flow.fromGraph(new InboundTestStage(this)) - - def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] = - Flow.fromGraph(new OutboundTestStage(association)) + def outboundTestFlow(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] = + Flow.fromGraph(new OutboundTestStage(outboundContext, settings.Advanced.TestMode)) /** INTERNAL API: for testing only. */ private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = { @@ -888,9 +924,6 @@ private[remote] object ArteryTransport { val ProtocolName = "artery" val Version = 0 - val MaximumFrameSize = 1024 * 1024 - val MaximumPooledBuffers = 256 - val MaximumLargeFrameSize = MaximumFrameSize * 5 /** * Internal API diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 55db97720e..552ce77b65 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -42,6 +42,7 @@ import akka.util.{ Unsafe, WildcardIndex } import akka.util.OptionVal import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.remote.artery.compress.CompressionProtocol._ +import akka.stream.scaladsl.MergeHub /** * INTERNAL API @@ -50,6 +51,10 @@ private[remote] object Association { final case class QueueWrapper(queue: Queue[OutboundEnvelope]) extends SendQueue.ProducerApi[OutboundEnvelope] { override def offer(message: OutboundEnvelope): Boolean = queue.offer(message) } + + final val ControlQueueIndex = 0 + final val LargeQueueIndex = 1 + final val OrdinaryQueueIndex = 2 } /** @@ -70,11 +75,6 @@ private[remote] class Association( import Association._ private val log = Logging(transport.system, getClass.getName) - private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize - // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue - // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption - private val queueSize = 3072 - private val largeQueueSize = 256 private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout) @@ -85,30 +85,57 @@ private[remote] class Association( def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) - @volatile private[this] var queue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(queueSize)) - @volatile private[this] var largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(largeQueueSize)) - @volatile private[this] var controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(controlQueueSize)) + private val outboundLanes = transport.settings.Advanced.OutboundLanes + private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize + // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue + // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption + private val queueSize = 3072 + private val largeQueueSize = 256 + + private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes) + queues(ControlQueueIndex) = QueueWrapper(createQueue(controlQueueSize)) // control stream + queues(LargeQueueIndex) = QueueWrapper(createQueue(largeQueueSize)) // large messages stream + (0 until outboundLanes).foreach { i ⇒ + queues(OrdinaryQueueIndex + i) = QueueWrapper(createQueue(queueSize)) // ordinary messages stream + } + @volatile private[this] var queuesVisibility = false + + private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex) + private def largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(LargeQueueIndex) + @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) - @volatile private[this] var changeOutboundCompression: Option[ChangeOutboundCompression] = None + @volatile private[this] var changeOutboundCompression: Option[Vector[ChangeOutboundCompression]] = None - def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = + def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = { + import transport.system.dispatcher changeOutboundCompression match { - case Some(c) ⇒ c.changeActorRefCompression(table) - case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + case Some(c) ⇒ + if (c.size == 1) c.head.changeActorRefCompression(table) + else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ ⇒ Done) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) } + } - def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = + def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { + import transport.system.dispatcher changeOutboundCompression match { - case Some(c) ⇒ c.changeClassManifestCompression(table) - case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + case Some(c) ⇒ + if (c.size == 1) c.head.changeClassManifestCompression(table) + else Future.sequence(c.map(_.changeClassManifestCompression(table))).map(_ ⇒ Done) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) } + } - def clearCompression(): Future[Done] = + def clearCompression(): Future[Done] = { + import transport.system.dispatcher changeOutboundCompression match { - case Some(c) ⇒ c.clearCompression() - case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) + case Some(c) ⇒ + if (c.size == 1) c.head.clearCompression() + else Future.sequence(c.map(_.clearCompression())).map(_ ⇒ Done) + case None ⇒ Future.failed(new ChangeOutboundCompressionFailed) } + } private val _testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList @@ -201,6 +228,9 @@ private[remote] class Association( def createOutboundEnvelope(): OutboundEnvelope = outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender) + // volatile read to see latest queue array + val unused = queuesVisibility + // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { @@ -219,9 +249,11 @@ private[remote] class Association( val outboundEnvelope1 = createOutboundEnvelope() if (!controlQueue.offer(outboundEnvelope1)) transport.system.deadLetters ! outboundEnvelope1 - val outboundEnvelope2 = createOutboundEnvelope() - if (!queue.offer(outboundEnvelope2)) - transport.system.deadLetters ! outboundEnvelope2 + (0 until outboundLanes).foreach { i ⇒ + val outboundEnvelope2 = createOutboundEnvelope() + if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2)) + transport.system.deadLetters ! outboundEnvelope2 + } case _ ⇒ val outboundEnvelope = createOutboundEnvelope() val queue = selectQueue(recipient) @@ -233,30 +265,35 @@ private[remote] class Association( log.debug("Dropping message to quarantined system {}", remoteAddress) } - @tailrec private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = { recipient match { case OptionVal.Some(r) ⇒ - r.cachedMessageDestinationFlag match { - case RegularDestination ⇒ queue - case PriorityDestination ⇒ controlQueue - case LargeDestination ⇒ largeQueue - case null ⇒ + val queueIndex = r.cachedSendQueueIndex match { + case -1 ⇒ // only happens when messages are sent to new remote destination // and is then cached on the RemoteActorRef val elements = r.path.elements - if (priorityMessageDestinations.find(elements).isDefined) { - log.debug("Using priority message stream for {}", r.path) - r.cachedMessageDestinationFlag = PriorityDestination - } else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) { - log.debug("Using large message stream for {}", r.path) - r.cachedMessageDestinationFlag = LargeDestination - } else { - r.cachedMessageDestinationFlag = RegularDestination - } - selectQueue(recipient) + val idx = + if (priorityMessageDestinations.find(elements).isDefined) { + log.debug("Using priority message stream for {}", r.path) + ControlQueueIndex + } else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) { + log.debug("Using large message stream for {}", r.path) + LargeQueueIndex + } else if (outboundLanes == 1) { + OrdinaryQueueIndex + } else { + // select lane based on destination, to preserve message order + OrdinaryQueueIndex + (math.abs(r.path.uid) % outboundLanes) + } + r.cachedSendQueueIndex = idx + idx + case idx ⇒ idx } - case OptionVal.None ⇒ queue + queues(queueIndex) + + case OptionVal.None ⇒ + queues(OrdinaryQueueIndex) } } @@ -333,29 +370,22 @@ private[remote] class Association( // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) - val wrapper = getOrCreateQueueWrapper(controlQueue, queueSize) - controlQueue = wrapper // use new underlying queue immediately for restarts + val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize) + queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts + queuesVisibility = true // volatile write for visibility of the queues array - val (queueValue, (control, completed)) = - if (transport.settings.Advanced.TestMode) { - val ((queueValue, mgmt), (control, completed)) = - Source.fromGraph(new SendQueue[OutboundEnvelope]) - .via(transport.outboundControlPart1(this)) - .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundControlPart2(this))(Keep.both) - .run()(materializer) - _testStages.add(mgmt) - (queueValue, (control, completed)) - } else { - Source.fromGraph(new SendQueue[OutboundEnvelope]) - .via(transport.outboundControlPart1(this)) - .toMat(transport.outboundControlPart2(this))(Keep.both) - .run()(materializer) - } + val (queueValue, (testMgmt, control, completed)) = + Source.fromGraph(new SendQueue[OutboundEnvelope]) + .toMat(transport.outboundControl(this))(Keep.both) + .run()(materializer) + + if (transport.settings.Advanced.TestMode) + _testStages.add(testMgmt) queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue - controlQueue = queueValue + queues(ControlQueueIndex) = queueValue + queuesVisibility = true // volatile write for visibility of the queues array _outboundControlIngress = control materializing.countDown() attachStreamRestart("Outbound control stream", completed, cause ⇒ { @@ -367,61 +397,103 @@ private[remote] class Association( }) } - private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[OutboundEnvelope], capacity: Int): QueueWrapper = - q match { + private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = { + val unused = queuesVisibility // volatile read to see latest queues array + queues(queueIndex) match { case existing: QueueWrapper ⇒ existing case _ ⇒ // use new queue for restarts QueueWrapper(createQueue(capacity)) } + } private def runOutboundOrdinaryMessagesStream(): Unit = { - val wrapper = getOrCreateQueueWrapper(queue, queueSize) - queue = wrapper // use new underlying queue immediately for restarts + if (outboundLanes == 1) { + val queueIndex = OrdinaryQueueIndex + val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize) + queues(queueIndex) = wrapper // use new underlying queue immediately for restarts + queuesVisibility = true // volatile write for visibility of the queues array - val (queueValue, (changeCompression, completed)) = - if (transport.settings.Advanced.TestMode) { - val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + val ((queueValue, testMgmt), (changeCompression, completed)) = + Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this))(Keep.both) .run()(materializer) - _testStages.add(mgmt) - (queueValue, completed) - } else { - Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outbound(this))(Keep.both) - .run()(materializer) + + if (transport.settings.Advanced.TestMode) + _testStages.add(testMgmt) + + queueValue.inject(wrapper.queue) + // replace with the materialized value, still same underlying queue + queues(queueIndex) = queueValue + queuesVisibility = true // volatile write for visibility of the queues array + changeOutboundCompression = Some(Vector(changeCompression)) + + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) + + } else { + val wrappers = (0 until outboundLanes).map { i ⇒ + val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize) + queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts + queuesVisibility = true // volatile write for visibility of the queues array + wrapper + }.toVector + + val lane = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .viaMat(transport.outboundTestFlow(this))(Keep.both) + .viaMat(transport.outboundLane(this))(Keep.both) + .watchTermination()(Keep.both) + .mapMaterializedValue { + case (((q, m), c), w) ⇒ ((q, m), (c, w)) + } + + val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer].toMat(transport.aeronSink(this))(Keep.both).run()(materializer) + + val values: Vector[((SendQueue.QueueValue[OutboundEnvelope], TestManagementApi), (Encoder.ChangeOutboundCompression, Future[Done]))] = + (0 until outboundLanes).map { _ ⇒ + lane.to(mergeHub).run()(materializer) + }(collection.breakOut) + + val (a, b) = values.unzip + val (queueValues, testMgmtValues) = a.unzip + val (changeCompressionValues, laneCompletedValues) = b.unzip + + if (transport.settings.Advanced.TestMode) + testMgmtValues.foreach(_testStages.add) + + import transport.system.dispatcher + val completed = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted) + + queueValues.zip(wrappers).zipWithIndex.foreach { + case ((q, w), i) ⇒ + q.inject(w.queue) + queues(OrdinaryQueueIndex + i) = q // replace with the materialized value, still same underlying queue } + queuesVisibility = true // volatile write for visibility of the queues array - queueValue.inject(wrapper.queue) - // replace with the materialized value, still same underlying queue - queue = queueValue - changeOutboundCompression = Some(changeCompression) + changeOutboundCompression = Some(changeCompressionValues) - attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) + } } private def runOutboundLargeMessagesStream(): Unit = { - val wrapper = getOrCreateQueueWrapper(largeQueue, largeQueueSize) - largeQueue = wrapper // use new underlying queue immediately for restarts + val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize) + queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts + queuesVisibility = true // volatile write for visibility of the queues array - val (queueValue, completed) = - if (transport.settings.Advanced.TestMode) { - val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) - .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundLarge(this))(Keep.both) - .run()(materializer) - _testStages.add(mgmt) - (queueValue, completed) - } else { - Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outboundLarge(this))(Keep.both) - .run()(materializer) - } + val ((queueValue, testMgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .viaMat(transport.outboundTestFlow(this))(Keep.both) + .toMat(transport.outboundLarge(this))(Keep.both) + .run()(materializer) + + if (transport.settings.Advanced.TestMode) + _testStages.add(testMgmt) queueValue.inject(wrapper.queue) // replace with the materialized value, still same underlying queue - largeQueue = queueValue + queues(LargeQueueIndex) = queueValue + queuesVisibility = true // volatile write for visibility of the queues array attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index cd6380fe9f..10916fbf93 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -21,6 +21,7 @@ import akka.remote.artery.compress.CompressionTable import akka.Done import akka.stream.stage.GraphStageWithMaterializedValue import scala.concurrent.Promise +import java.util.concurrent.atomic.AtomicInteger /** * INTERNAL API @@ -34,6 +35,7 @@ private[remote] object Encoder { private[remote] class ChangeOutboundCompressionFailed extends RuntimeException( "Change of outbound compression table failed (will be retried), because materialization did not complete yet") + } /** @@ -212,7 +214,6 @@ private[remote] class Decoder( import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address private val headerBuilder = HeaderBuilder.in(compression) - private val serialization = SerializationExtension(system) private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 @@ -284,35 +285,24 @@ private[remote] class Decoder( // --- end of hit refs and manifests for heavy-hitter counting } - try { - val deserializedMessage = MessageSerializer.deserializeForArtery( - system, originUid, serialization, headerBuilder.serializer, classManifest, envelope) + val decoded = inEnvelopePool.acquire().init( + recipient, + localAddress, // FIXME: Is this needed anymore? What should we do here? + sender, + originUid, + headerBuilder.serializer, + classManifest, + envelope, + association) - val decoded = inEnvelopePool.acquire().init( - recipient, - localAddress, // FIXME: Is this needed anymore? What should we do here? - deserializedMessage, - sender, // FIXME: No need for an option, decode simply to deadLetters instead - originUid, - association) - - if (recipient.isEmpty && !headerBuilder.isNoRecipient) { - // the remote deployed actor might not be created yet when resolving the - // recipient for the first message that is sent to it, best effort retry - scheduleOnce(RetryResolveRemoteDeployedRecipient( - retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE? - } else - push(out, decoded) - } catch { - case NonFatal(e) ⇒ - log.warning( - "Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", - headerBuilder.serializer, classManifest, e.getMessage) - pull(in) - } finally { - bufferPool.release(envelope) - } + if (recipient.isEmpty && !headerBuilder.isNoRecipient) { + // the remote deployed actor might not be created yet when resolving the + // recipient for the first message that is sent to it, best effort retry + scheduleOnce(RetryResolveRemoteDeployedRecipient( + retryResolveRemoteDeployedRecipientAttempts, + headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE? + } else + push(out, decoded) } private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { @@ -369,3 +359,48 @@ private[remote] class Decoder( } } +/** + * INTERNAL API + */ +private[remote] class Deserializer( + inboundContext: InboundContext, + system: ExtendedActorSystem, + bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { + + val in: Inlet[InboundEnvelope] = Inlet("Artery.Deserializer.in") + val out: Outlet[InboundEnvelope] = Outlet("Artery.Deserializer.out") + val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + private val serialization = SerializationExtension(system) + + override protected def logSource = classOf[Deserializer] + + override def onPush(): Unit = { + val envelope = grab(in) + + try { + val deserializedMessage = MessageSerializer.deserializeForArtery( + system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer) + + push(out, envelope.withMessage(deserializedMessage)) + } catch { + case NonFatal(e) ⇒ + log.warning( + "Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", + envelope.serializer, envelope.classManifest, e.getMessage) + pull(in) + } finally { + val buf = envelope.envelopeBuffer + envelope.releaseEnvelopeBuffer() + bufferPool.release(buf) + } + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } +} + diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala index 2c20cb287d..bd15681ef4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -11,7 +11,10 @@ import akka.actor.ActorRef /** * INTERNAL API */ -private[akka] object InboundEnvelope { +private[remote] object InboundEnvelope { + /** + * Only used in tests + */ def apply( recipient: OptionVal[InternalActorRef], recipientAddress: Address, @@ -20,7 +23,8 @@ private[akka] object InboundEnvelope { originUid: Long, association: OptionVal[OutboundContext]): InboundEnvelope = { val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, message, sender, originUid, association) + env.init(recipient, recipientAddress, sender, originUid, -1, "", null, association) + .withMessage(message) } } @@ -28,23 +32,29 @@ private[akka] object InboundEnvelope { /** * INTERNAL API */ -private[akka] trait InboundEnvelope { +private[remote] trait InboundEnvelope { def recipient: OptionVal[InternalActorRef] def recipientAddress: Address - def message: AnyRef def sender: OptionVal[ActorRef] def originUid: Long def association: OptionVal[OutboundContext] + def serializer: Int + def classManifest: String + def message: AnyRef + def envelopeBuffer: EnvelopeBuffer + def withMessage(message: AnyRef): InboundEnvelope + def releaseEnvelopeBuffer(): InboundEnvelope + def withRecipient(ref: InternalActorRef): InboundEnvelope } /** * INTERNAL API */ -private[akka] object ReusableInboundEnvelope { +private[remote] object ReusableInboundEnvelope { def createObjectPool(capacity: Int) = new ObjectPool[ReusableInboundEnvelope]( capacity, create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear()) @@ -56,23 +66,34 @@ private[akka] object ReusableInboundEnvelope { private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { private var _recipient: OptionVal[InternalActorRef] = OptionVal.None private var _recipientAddress: Address = null - private var _message: AnyRef = null private var _sender: OptionVal[ActorRef] = OptionVal.None private var _originUid: Long = 0L private var _association: OptionVal[OutboundContext] = OptionVal.None + private var _serializer: Int = -1 + private var _classManifest: String = null + private var _message: AnyRef = null + private var _envelopeBuffer: EnvelopeBuffer = null override def recipient: OptionVal[InternalActorRef] = _recipient override def recipientAddress: Address = _recipientAddress - override def message: AnyRef = _message override def sender: OptionVal[ActorRef] = _sender override def originUid: Long = _originUid override def association: OptionVal[OutboundContext] = _association + override def serializer: Int = _serializer + override def classManifest: String = _classManifest + override def message: AnyRef = _message + override def envelopeBuffer: EnvelopeBuffer = _envelopeBuffer override def withMessage(message: AnyRef): InboundEnvelope = { _message = message this } + def releaseEnvelopeBuffer(): InboundEnvelope = { + _envelopeBuffer = null + this + } + def withRecipient(ref: InternalActorRef): InboundEnvelope = { _recipient = OptionVal(ref) this @@ -90,15 +111,19 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { def init( recipient: OptionVal[InternalActorRef], recipientAddress: Address, - message: AnyRef, sender: OptionVal[ActorRef], originUid: Long, + serializer: Int, + classManifest: String, + envelopeBuffer: EnvelopeBuffer, association: OptionVal[OutboundContext]): InboundEnvelope = { _recipient = recipient _recipientAddress = recipientAddress - _message = message _sender = sender _originUid = originUid + _serializer = serializer + _classManifest = classManifest + _envelopeBuffer = envelopeBuffer _association = association this } diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index b354f32bda..45e975e0e3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -24,6 +24,7 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.util.OptionVal +import akka.stream.stage.GraphStageLogic /** * INTERNAL API @@ -49,6 +50,11 @@ private[remote] class TestManagementApiImpl(stopped: Future[Done], callback: Asy } } +private[remote] class DisabledTestManagementApi extends TestManagementApi { + override def send(command: Any)(implicit ec: ExecutionContext): Future[Done] = + Future.failed(new RuntimeException("TestStage is disabled, enable with MultiNodeConfig.testTransport")) +} + /** * INTERNAL API */ @@ -57,61 +63,70 @@ private[remote] final case class TestManagementMessage(command: Any, done: Promi /** * INTERNAL API */ -private[remote] class OutboundTestStage(outboundContext: OutboundContext) +private[remote] class OutboundTestStage(outboundContext: OutboundContext, enabled: Boolean) extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], TestManagementApi] { val in: Inlet[OutboundEnvelope] = Inlet("OutboundTestStage.in") val out: Outlet[OutboundEnvelope] = Outlet("OutboundTestStage.out") override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val stoppedPromise = Promise[Done]() + if (enabled) { + val stoppedPromise = Promise[Done]() - // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way - val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging { - private var blackhole = Set.empty[Address] + private var blackhole = Set.empty[Address] - private val callback = getAsyncCallback[TestManagementMessage] { - case TestManagementMessage(command, done) ⇒ - command match { - case SetThrottle(address, Direction.Send | Direction.Both, Blackhole) ⇒ - log.info("blackhole outbound messages to {}", address) - blackhole += address - case SetThrottle(address, Direction.Send | Direction.Both, Unthrottled) ⇒ - log.info("accept outbound messages to {}", address) - blackhole -= address - case _ ⇒ // not interested - } - done.success(Done) + private val callback = getAsyncCallback[TestManagementMessage] { + case TestManagementMessage(command, done) ⇒ + command match { + case SetThrottle(address, Direction.Send | Direction.Both, Blackhole) ⇒ + log.info("blackhole outbound messages to {}", address) + blackhole += address + case SetThrottle(address, Direction.Send | Direction.Both, Unthrottled) ⇒ + log.info("accept outbound messages to {}", address) + blackhole -= address + case _ ⇒ // not interested + } + done.success(Done) + } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } + + override def postStop(): Unit = stoppedPromise.success(Done) + + // InHandler + override def onPush(): Unit = { + val env = grab(in) + if (blackhole(outboundContext.remoteAddress)) { + log.debug( + "dropping outbound message [{}] to [{}] because of blackhole", + env.message.getClass.getName, outboundContext.remoteAddress) + pull(in) // drop message + } else + push(out, env) + } + + // OutHandler + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) } - override def preStart(): Unit = { - initCallback(callback.invoke) + val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic) + + (logic, managementApi) + } else { + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) } - - override def postStop(): Unit = stoppedPromise.success(Done) - - // InHandler - override def onPush(): Unit = { - val env = grab(in) - if (blackhole(outboundContext.remoteAddress)) { - log.debug( - "dropping outbound message [{}] to [{}] because of blackhole", - env.message.getClass.getName, outboundContext.remoteAddress) - pull(in) // drop message - } else - push(out, env) - } - - // OutHandler - override def onPull(): Unit = pull(in) - - setHandlers(in, out, this) + (logic, new DisabledTestManagementApi) } - - val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic) - - (logic, managementApi) } } @@ -119,67 +134,76 @@ private[remote] class OutboundTestStage(outboundContext: OutboundContext) /** * INTERNAL API */ -private[remote] class InboundTestStage(inboundContext: InboundContext) +private[remote] class InboundTestStage(inboundContext: InboundContext, enabled: Boolean) extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], TestManagementApi] { val in: Inlet[InboundEnvelope] = Inlet("InboundTestStage.in") val out: Outlet[InboundEnvelope] = Outlet("InboundTestStage.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val stoppedPromise = Promise[Done]() + if (enabled) { + val stoppedPromise = Promise[Done]() - // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way - val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging { + // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way + val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging { - private var blackhole = Set.empty[Address] + private var blackhole = Set.empty[Address] - private val callback = getAsyncCallback[TestManagementMessage] { - case TestManagementMessage(command, done) ⇒ - command match { - case SetThrottle(address, Direction.Receive | Direction.Both, Blackhole) ⇒ - log.info("blackhole inbound messages from {}", address) - blackhole += address - case SetThrottle(address, Direction.Receive | Direction.Both, Unthrottled) ⇒ - log.info("accept inbound messages from {}", address) - blackhole -= address - case _ ⇒ // not interested - } - done.success(Done) - } - - override def preStart(): Unit = { - initCallback(callback.invoke) - } - - override def postStop(): Unit = stoppedPromise.success(Done) - - // InHandler - override def onPush(): Unit = { - val env = grab(in) - env.association match { - case OptionVal.None ⇒ - // unknown, handshake not completed - push(out, env) - case OptionVal.Some(association) ⇒ - if (blackhole(association.remoteAddress)) { - log.debug( - "dropping inbound message [{}] from [{}] with UID [{}] because of blackhole", - env.message.getClass.getName, association.remoteAddress, env.originUid) - pull(in) // drop message - } else - push(out, env) + private val callback = getAsyncCallback[TestManagementMessage] { + case TestManagementMessage(command, done) ⇒ + command match { + case SetThrottle(address, Direction.Receive | Direction.Both, Blackhole) ⇒ + log.info("blackhole inbound messages from {}", address) + blackhole += address + case SetThrottle(address, Direction.Receive | Direction.Both, Unthrottled) ⇒ + log.info("accept inbound messages from {}", address) + blackhole -= address + case _ ⇒ // not interested + } + done.success(Done) } + + override def preStart(): Unit = { + initCallback(callback.invoke) + } + + override def postStop(): Unit = stoppedPromise.success(Done) + + // InHandler + override def onPush(): Unit = { + val env = grab(in) + env.association match { + case OptionVal.None ⇒ + // unknown, handshake not completed + push(out, env) + case OptionVal.Some(association) ⇒ + if (blackhole(association.remoteAddress)) { + log.debug( + "dropping inbound message [{}] from [{}] with UID [{}] because of blackhole", + env.message.getClass.getName, association.remoteAddress, env.originUid) + pull(in) // drop message + } else + push(out, env) + } + } + + // OutHandler + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) } - // OutHandler - override def onPull(): Unit = pull(in) + val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic) - setHandlers(in, out, this) + (logic, managementApi) + } else { + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } + (logic, new DisabledTestManagementApi) } - - val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic) - - (logic, managementApi) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index 368537ee55..ead929855d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -39,7 +39,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { r } - val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val pool = new EnvelopeBufferPool(1034 * 1024, 128) val matSettings = ActorMaterializerSettings(system).withFuzzing(true) implicit val mat = ActorMaterializer(matSettings)(system) diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala index ce8126368a..a246e865b4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala @@ -10,6 +10,7 @@ import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.testkit.SocketUtil import akka.testkit.TestActors import com.typesafe.config.ConfigFactory +import akka.testkit.TestProbe object HandshakeFailureSpec { @@ -22,7 +23,7 @@ object HandshakeFailureSpec { remote.artery.enabled = on remote.artery.hostname = localhost remote.artery.port = 0 - remote.handshake-timeout = 2s + remote.artery.advanced.handshake-timeout = 2s } """) diff --git a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala index bb584ea8f2..6312a8bc28 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala @@ -4,7 +4,7 @@ package akka.remote.artery import akka.actor.{ Actor, ActorRef, ActorSelection, Props, RootActorPath } -import akka.remote.{ LargeDestination, RARP, RegularDestination, RemoteActorRef } +import akka.remote.{ RARP, RemoteActorRef } import akka.testkit.TestProbe import akka.util.ByteString @@ -51,7 +51,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( senderProbeA.expectMsg(Pong(0)) // flag should be cached now - regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination) + regularRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should be >= (Association.OrdinaryQueueIndex) } @@ -75,7 +75,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( senderProbeA.expectMsg(Pong(0)) // flag should be cached now - largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination) + largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex) } @@ -112,8 +112,8 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) // cached flags should be set now - largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination) - regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination) + largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex) + regularRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should be >= (Association.OrdinaryQueueIndex) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala index 77fdca9792..654a31221b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala @@ -21,7 +21,6 @@ object RemoteMessageSerializationSpec { case s if sender().path == another.path ⇒ one ! s } } - val maxPayloadBytes = ArteryTransport.MaximumFrameSize } class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" @@ -31,6 +30,8 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" import RemoteMessageSerializationSpec._ + val maxPayloadBytes = RARP(system).provider.remoteSettings.Artery.Advanced.MaximumFrameSize + val remoteSystem = newRemoteSystem() val remotePort = port(remoteSystem) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 8149919ee6..da8b7bea11 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -12,6 +12,8 @@ import akka.remote.RARP import akka.testkit.TestActors import akka.actor.PoisonPill import akka.testkit.TestProbe +import akka.actor.ActorRef +import com.typesafe.config.Config object RemoteSendConsistencySpec { @@ -26,7 +28,15 @@ object RemoteSendConsistencySpec { } -class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.config) with ImplicitSender { +class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(RemoteSendConsistencySpec.config) + +class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.advanced.outbound-lanes = 3 + akka.remote.artery.advanced.inbound-lanes = 3 + """).withFallback(RemoteSendConsistencySpec.config)) + +abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpec(config) with ImplicitSender { val systemB = ActorSystem("systemB", system.settings.config) val addressB = RARP(systemB).provider.getDefaultAddress @@ -78,18 +88,24 @@ class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.confi } "be able to send messages concurrently preserving order" in { - val actorOnSystemB = systemB.actorOf(Props(new Actor { - def receive = { - case i: Int ⇒ sender() ! i - } - }), "echo2") + systemB.actorOf(TestActors.echoActorProps, "echoA") + systemB.actorOf(TestActors.echoActorProps, "echoB") + systemB.actorOf(TestActors.echoActorProps, "echoC") - val remoteRef = { - system.actorSelection(rootB / "user" / "echo2") ! Identify(None) + val remoteRefA = { + system.actorSelection(rootB / "user" / "echoA") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + val remoteRefB = { + system.actorSelection(rootB / "user" / "echoB") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + val remoteRefC = { + system.actorSelection(rootB / "user" / "echoC") ! Identify(None) expectMsgType[ActorIdentity].ref.get } - val senderProps = Props(new Actor { + def senderProps(remoteRef: ActorRef) = Props(new Actor { var counter = 1000 remoteRef ! counter @@ -106,10 +122,10 @@ class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.confi } }).withDeploy(Deploy.local) - system.actorOf(senderProps) - system.actorOf(senderProps) - system.actorOf(senderProps) - system.actorOf(senderProps) + system.actorOf(senderProps(remoteRefA)) + system.actorOf(senderProps(remoteRefB)) + system.actorOf(senderProps(remoteRefC)) + system.actorOf(senderProps(remoteRefA)) within(10.seconds) { expectMsg("success") diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 04233b14ef..5ae7d99c58 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -20,7 +20,7 @@ import akka.util.OptionVal import akka.actor.InternalActorRef import akka.dispatch.ExecutionContexts -private[akka] class TestInboundContext( +private[remote] class TestInboundContext( override val localAddress: UniqueAddress, val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, val controlProbe: Option[ActorRef] = None, @@ -61,7 +61,7 @@ private[akka] class TestInboundContext( new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe) } -private[akka] class TestOutboundContext( +private[remote] class TestOutboundContext( override val localAddress: UniqueAddress, override val remoteAddress: Address, override val controlSubject: TestControlMessageSubject, @@ -96,7 +96,7 @@ private[akka] class TestOutboundContext( } -private[akka] class TestControlMessageSubject extends ControlMessageSubject { +private[remote] class TestControlMessageSubject extends ControlMessageSubject { private val observers = new CopyOnWriteArrayList[ControlMessageObserver] @@ -119,7 +119,7 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject { } -private[akka] class ManualReplyInboundContext( +private[remote] class ManualReplyInboundContext( replyProbe: ActorRef, localAddress: UniqueAddress, controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index 1a0d919873..e3d479c7fa 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -28,7 +28,7 @@ object CompressionIntegrationSpec { remote.artery.enabled = on remote.artery.hostname = localhost remote.artery.port = 0 - remote.handshake-timeout = 10s + remote.artery.advanced.handshake-timeout = 10s remote.artery.advanced.compression { actor-refs.advertisement-interval = 3 seconds diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index 74ac512e67..f2eff03ea0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -30,7 +30,7 @@ object HandshakeShouldDropCompressionTableSpec { remote.artery.enabled = on remote.artery.hostname = localhost remote.artery.port = 0 - remote.handshake-timeout = 10s + remote.artery.advanced.handshake-timeout = 10s remote.artery.advanced.compression { actor-refs {