support for parallel lanes, #21207

* for parallel serialziation/deserialization
* MergeHub for the outbound lanes
* BroadcastHub + filter for the inbound lanes, until we
  have a PartitionHub
* simplify materialization of test stage
* add RemoteSendConsistencyWithThreeLanesSpec
This commit is contained in:
Patrik Nordwall 2016-08-30 14:37:11 +02:00
parent 3d3a3528bf
commit faf941b4c8
25 changed files with 2653 additions and 383 deletions

View file

@ -55,12 +55,12 @@ class CodecBenchmark {
implicit val system = ActorSystem("CodecBenchmark", config)
val systemB = ActorSystem("systemB", system.settings.config)
private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
private val envelopePool = new EnvelopeBufferPool(1024 * 1024, 128)
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16)
val headerIn = HeaderBuilder.in(NoInboundCompressions)
val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN)
val envelopeTemplateBuffer = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.LITTLE_ENDIAN)
val uniqueLocalAddress = UniqueAddress(
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,

View file

@ -69,7 +69,7 @@ abstract class AeronStreamConsistencySpec
r
}
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher

View file

@ -86,7 +86,7 @@ abstract class AeronStreamLatencySpec
val driver = MediaDriver.launchEmbedded()
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
val stats =
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))

View file

@ -84,7 +84,7 @@ abstract class AeronStreamMaxThroughputSpec
val driver = MediaDriver.launchEmbedded()
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
val stats =
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))

View file

@ -30,6 +30,7 @@ object LatencySpec extends MultiNodeConfig {
# for serious measurements you should increase the totalMessagesFactor (30) and repeatCount (3)
akka.test.LatencySpec.totalMessagesFactor = 1.0
akka.test.LatencySpec.repeatCount = 1
akka.test.LatencySpec.real-message = off
akka {
loglevel = ERROR
# avoid TestEventListener
@ -86,18 +87,24 @@ object LatencySpec extends MultiNodeConfig {
def receive = {
case bytes: Array[Byte]
if (bytes.length != 0) {
if (count == 0)
startTime = System.nanoTime()
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
count += 1
val d = System.nanoTime() - sendTimes.get(count - 1)
histogram.recordValue(d)
if (count == totalMessages) {
printTotal(testName, bytes.length, histogram, System.nanoTime() - startTime)
context.stop(self)
}
receiveMessage(bytes.length)
}
case _: TestMessage
receiveMessage(payloadSize)
}
def receiveMessage(size: Int): Unit = {
if (count == 0)
startTime = System.nanoTime()
reporter.onMessage(1, payloadSize)
count += 1
val d = System.nanoTime() - sendTimes.get(count - 1)
histogram.recordValue(d)
if (count == totalMessages) {
printTotal(testName, size, histogram, System.nanoTime() - startTime)
context.stop(self)
}
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long): Unit = {
@ -133,7 +140,8 @@ object LatencySpec extends MultiNodeConfig {
testName: String,
messageRate: Int, // msg/s
payloadSize: Int,
repeat: Int)
repeat: Int,
realMessage: Boolean)
}
@ -148,6 +156,7 @@ abstract class LatencySpec
val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor")
val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount")
val realMessage = system.settings.config.getBoolean("akka.test.LatencySpec.real-message")
var plots = LatencyPlots()
@ -183,32 +192,38 @@ abstract class LatencySpec
testName = "warmup",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
repeat = repeatCount,
realMessage),
TestSettings(
testName = "rate-100-size-100",
messageRate = 100,
payloadSize = 100,
repeat = repeatCount),
repeat = repeatCount,
realMessage),
TestSettings(
testName = "rate-1000-size-100",
messageRate = 1000,
payloadSize = 100,
repeat = repeatCount),
repeat = repeatCount,
realMessage),
TestSettings(
testName = "rate-10000-size-100",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
repeat = repeatCount,
realMessage),
TestSettings(
testName = "rate-20000-size-100",
messageRate = 20000,
payloadSize = 100,
repeat = repeatCount),
repeat = repeatCount,
realMessage),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
payloadSize = 1000,
repeat = repeatCount))
repeat = repeatCount,
realMessage))
def test(testSettings: TestSettings): Unit = {
import testSettings._
@ -259,6 +274,17 @@ abstract class LatencySpec
adjust = math.max(0L, (diff - targetDelay) / 2)
}
val msg =
if (testSettings.realMessage)
TestMessage(
id = i,
name = "abc",
status = i % 2 == 0,
description = "ABC",
payload = payload,
items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B")))
else payload
echo.tell(payload, receiver)
i += 1
}

View file

@ -31,6 +31,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0
akka.test.MaxThroughputSpec.real-message = off
akka {
loglevel = INFO
log-dead-letters = 1000000
@ -44,9 +45,11 @@ object MaxThroughputSpec extends MultiNodeConfig {
serializers {
test = "akka.remote.artery.MaxThroughputSpec$$TestSerializer"
test-message = "akka.remote.artery.TestMessageSerializer"
}
serialization-bindings {
"akka.remote.artery.MaxThroughputSpec$$FlowControl" = test
"akka.remote.artery.TestMessage" = test-message
}
}
remote.artery {
@ -85,6 +88,9 @@ object MaxThroughputSpec extends MultiNodeConfig {
if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
c += 1
case msg: TestMessage
reporter.onMessage(1, payloadSize)
c += 1
case Start
c = 0
sender() ! Start
@ -194,8 +200,19 @@ object MaxThroughputSpec extends MultiNodeConfig {
val batchSize = math.min(remaining, burstSize)
var i = 0
while (i < batchSize) {
// target ! payload
target.tell(payload, ActorRef.noSender)
val msg =
if (realMessage)
TestMessage(
id = totalMessages - remaining + i,
name = "abc",
status = i % 2 == 0,
description = "ABC",
payload = payload,
items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B")))
else payload
// target ! msg
target.tell(msg, ActorRef.noSender)
i += 1
}
remaining -= batchSize
@ -214,7 +231,8 @@ object MaxThroughputSpec extends MultiNodeConfig {
totalMessages: Long,
burstSize: Int,
payloadSize: Int,
senderReceiverPairs: Int) {
senderReceiverPairs: Int,
realMessage: Boolean) {
// data based on measurement
def totalSize(system: ActorSystem) = payloadSize + (if (RARP(system).provider.remoteSettings.Artery.Advanced.Compression.Enabled) 38 else 110)
}
@ -267,6 +285,7 @@ abstract class MaxThroughputSpec
import MaxThroughputSpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor")
val realMessage = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.real-message")
var plot = PlotResult()
@ -302,31 +321,36 @@ abstract class MaxThroughputSpec
totalMessages = adjustedTotalMessages(20000),
burstSize = 1000,
payloadSize = 100,
senderReceiverPairs = 1),
senderReceiverPairs = 1,
realMessage),
TestSettings(
testName = "1-to-1",
totalMessages = adjustedTotalMessages(50000),
burstSize = 1000,
payloadSize = 100,
senderReceiverPairs = 1),
senderReceiverPairs = 1,
realMessage),
TestSettings(
testName = "1-to-1-size-1k",
totalMessages = adjustedTotalMessages(20000),
burstSize = 1000,
payloadSize = 1000,
senderReceiverPairs = 1),
senderReceiverPairs = 1,
realMessage),
TestSettings(
testName = "1-to-1-size-10k",
totalMessages = adjustedTotalMessages(10000),
burstSize = 1000,
payloadSize = 10000,
senderReceiverPairs = 1),
senderReceiverPairs = 1,
realMessage),
TestSettings(
testName = "5-to-5",
totalMessages = adjustedTotalMessages(20000),
burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000
payloadSize = 100,
senderReceiverPairs = 5))
senderReceiverPairs = 5,
realMessage))
def test(testSettings: TestSettings): Unit = {
import testSettings._

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializerWithStringManifest
import akka.serialization.ByteBufferSerializer
import akka.remote.artery.protobuf.{ TestMessages proto }
import akka.protobuf.ByteString
import java.util.concurrent.locks.LockSupport
object TestMessage {
final case class Item(id: Long, name: String)
}
final case class TestMessage(
id: Long,
name: String,
status: Boolean,
description: String,
payload: Array[Byte],
items: Vector[TestMessage.Item])
class TestMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest {
val TestMessageManifest = "A"
override val identifier: Int = 101
override def manifest(o: AnyRef): String =
o match {
case _: TestMessage TestMessageManifest
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case msg: TestMessage
val builder = proto.TestMessage.newBuilder()
.setId(msg.id)
.setName(msg.name)
.setDescription(msg.description)
.setStatus(msg.status)
.setPayload(ByteString.copyFrom(msg.payload))
msg.items.foreach { item
builder.addItems(proto.Item.newBuilder().setId(item.id).setName(item.name))
}
builder.build().toByteArray()
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val protoMsg = proto.TestMessage.parseFrom(bytes)
import scala.collection.JavaConverters._
val items = protoMsg.getItemsList.asScala.map { item
TestMessage.Item(item.getId, item.getName)
}.toVector
TestMessage(
id = protoMsg.getId,
name = protoMsg.getName,
description = protoMsg.getDescription,
status = protoMsg.getStatus,
payload = protoMsg.getPayload.toByteArray(),
items = items)
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
option java_package = "akka.remote.artery.protobuf";
message TestMessage {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
optional string description = 4;
optional bytes payload = 5;
repeated Item items = 6;
}
message Item {
required uint64 id = 1;
required string name = 2;
}

View file

@ -152,6 +152,18 @@ akka {
# Level 1 strongly prefer low CPU consumption over low latency.
# Level 10 strongly prefer low latency over low CPU consumption.
idle-cpu-level = 5
# Number of outbound lanes for each outbound association. A value greater than 1
# means that serialization can be performed in parallel for different destination
# actors. The selection of lane is based on consistent hashing of the recipient
# ActorRef to preserve message ordering per receiver.
outbound-lanes = 1
# Total number of inbound lanes, shared among all inbound associations. A value
# greater than 1 means that deserialization can be performed in parallel for
# different destination actors. The selection of lane is based on consistent
# hashing of the recipient ActorRef to preserve message ordering per receiver.
inbound-lanes = 1
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
@ -161,6 +173,9 @@ akka {
# unacknowledged system messages are re-delivered with this interval
system-message-resend-interval = 1 second
# The timeout for outbound associations to perform the handshake.
handshake-timeout = 15 s
# incomplete handshake attempt is retried with this interval
handshake-retry-interval = 1 second

View file

@ -455,14 +455,6 @@ private[akka] trait RemoteRef extends ActorRefScope {
final def isLocal = false
}
/**
* INTERNAL API
*/
private[remote] sealed abstract class MessageDestinationFlag
private[remote] case object RegularDestination extends MessageDestinationFlag
private[remote] case object LargeDestination extends MessageDestinationFlag
private[remote] case object PriorityDestination extends MessageDestinationFlag
/**
* INTERNAL API
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
@ -488,7 +480,7 @@ private[akka] class RemoteActorRef private[akka] (
@volatile private[remote] var cachedAssociation: artery.Association = null
// used by artery to direct messages to separate specialized streams
@volatile private[remote] var cachedMessageDestinationFlag: MessageDestinationFlag = null
@volatile private[remote] var cachedSendQueueIndex: Int = -1
def getChild(name: Iterator[String]): InternalActorRef = {
val s = name.toStream

View file

@ -51,25 +51,43 @@ private[akka] final class ArterySettings private (config: Config) {
val DeleteAeronDirectory = getBoolean("delete-aeron-dir")
val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
val OutboundLanes = getInt("outbound-lanes").requiring(n
n > 0, "outbound-lanes must be greater than zero")
val InboundLanes = getInt("inbound-lanes").requiring(n
n > 0, "inbound-lanes must be greater than zero")
val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
_ > 0, "system-message-buffer-size must be more than zero")
val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > 0.seconds, "system-message-resend-interval must be more than zero")
interval > Duration.Zero, "system-message-resend-interval must be more than zero")
val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval
interval > Duration.Zero, "handshake-timeout must be more than zero")
val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval
interval > 0.seconds, "handshake-retry-interval must be more than zero")
interval > Duration.Zero, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > 0.seconds, "inject-handshake-interval must be more than zero")
val GiveUpSendAfter = config.getMillisDuration("give-up-send-after")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout")
interval > Duration.Zero, "inject-handshake-interval must be more than zero")
val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval
interval > Duration.Zero, "give-up-send-after must be more than zero")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval
interval > Duration.Zero, "inbound-restart-timeout must be more than zero")
val InboundMaxRestarts = getInt("inbound-max-restarts")
val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout")
val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval
interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
val OutboundMaxRestarts = getInt("outbound-max-restarts")
val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout")
val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout")
val DriverTimeout = config.getMillisDuration("driver-timeout")
val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval
interval > Duration.Zero, "client-liveness-timeout must be more than zero")
val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout").requiring(interval
interval > Duration.Zero, "image-liveness-timeout must be more than zero")
val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval
interval > Duration.Zero, "driver-timeout must be more than zero")
val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled")
val Compression = new Compression(getConfig("compression"))
final val MaximumFrameSize = 1024 * 1024
final val MaximumPooledBuffers = 128
final val MaximumLargeFrameSize = MaximumFrameSize * 5
final val InboundBroadcastHubBufferSize = MaximumPooledBuffers / 2
}
}

View file

@ -68,6 +68,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import org.agrona.concurrent.BackoffIdleStrategy
import akka.stream.scaladsl.BroadcastHub
/**
* INTERNAL API
@ -293,10 +294,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList
private val handshakeTimeout: FiniteDuration =
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(
_ > Duration.Zero,
"handshake-timeout must be > 0")
private val inboundLanes = settings.Advanced.InboundLanes
private val remoteDispatcher = system.dispatchers.lookup(settings.Dispatcher)
@ -317,15 +315,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val controlStreamId = 1
private val ordinaryStreamId = 3
private val largeStreamId = 4
private val ordinaryStreamId = 2
private val largeStreamId = 3
private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel)
private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout)
private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers)
private val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.MaximumPooledBuffers)
private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers)
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
// FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity
@ -528,22 +526,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
private def runInboundControlStream(compression: InboundCompressions): Unit = {
val (ctrl, completed) =
if (settings.Advanced.TestMode) {
val (mgmt, (ctrl, completed)) =
aeronSource(controlStreamId, envelopePool)
.via(inboundFlow(compression))
.viaMat(inboundTestFlow)(Keep.right)
.toMat(inboundControlSink)(Keep.both)
.run()(materializer)
testStages.add(mgmt)
(ctrl, completed)
} else {
aeronSource(controlStreamId, envelopePool)
.via(inboundFlow(compression))
.toMat(inboundControlSink)(Keep.right)
.run()(materializer)
}
val (testMgmt, ctrl, completed) =
aeronSource(controlStreamId, envelopeBufferPool)
.via(inboundFlow(compression))
.toMat(inboundControlSink)(Keep.right)
.run()(materializer)
if (settings.Advanced.TestMode)
testStages.add(testMgmt)
controlSubject = ctrl
@ -604,19 +594,54 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundOrdinaryMessagesStream(compression: InboundCompressions): Unit = {
val completed =
if (settings.Advanced.TestMode) {
val (mgmt, c) = aeronSource(ordinaryStreamId, envelopePool)
if (inboundLanes == 1) {
val (testMgmt, completed) = aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(inboundFlow(compression))
.viaMat(inboundTestFlow)(Keep.right)
.toMat(inboundSink)(Keep.both)
.toMat(inboundSink(envelopeBufferPool))(Keep.right)
.run()(materializer)
testStages.add(mgmt)
c
if (settings.Advanced.TestMode)
testStages.add(testMgmt)
completed
} else {
aeronSource(ordinaryStreamId, envelopePool)
val source = aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(inboundFlow(compression))
.toMat(inboundSink)(Keep.right)
.run()(materializer)
.map(env (env.recipient, env))
val broadcastHub = source.runWith(BroadcastHub.sink(bufferSize = settings.Advanced.InboundBroadcastHubBufferSize))(materializer)
val lane = inboundSink(envelopeBufferPool)
// select lane based on destination, to preserve message order
val partitionFun: OptionVal[ActorRef] Int = {
_ match {
case OptionVal.Some(r) math.abs(r.path.uid) % inboundLanes
case OptionVal.None 0
}
}
val values: Vector[(TestManagementApi, Future[Done])] =
(0 until inboundLanes).map { i
broadcastHub.runWith(
// TODO replace filter with "PartitionHub" when that is implemented
// must use a tuple here because envelope is pooled and must only be touched in the selected lane
Flow[(OptionVal[ActorRef], InboundEnvelope)].collect {
case (recipient, env) if partitionFun(recipient) == i env
}
.toMat(lane)(Keep.right))(materializer)
}(collection.breakOut)
val (testMgmtValues, completedValues) = values.unzip
if (settings.Advanced.TestMode)
testMgmtValues.foreach(testStages.add)
import system.dispatcher
val completed = Future.sequence(completedValues).map(_ Done)
completed
}
attachStreamRestart("Inbound message stream", completed, () runInboundOrdinaryMessagesStream(compression))
@ -625,21 +650,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def runInboundLargeMessagesStream(): Unit = {
val disableCompression = NoInboundCompressions // no compression on large message stream for now
val completed =
if (settings.Advanced.TestMode) {
val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool)
.via(inboundLargeFlow(disableCompression))
.viaMat(inboundTestFlow)(Keep.right)
.toMat(inboundSink)(Keep.both)
.run()(materializer)
testStages.add(mgmt)
c
} else {
aeronSource(largeStreamId, largeEnvelopePool)
.via(inboundLargeFlow(disableCompression))
.toMat(inboundSink)(Keep.right)
.run()(materializer)
}
val (testMgmt, completed) = aeronSource(largeStreamId, largeEnvelopeBufferPool)
.via(inboundLargeFlow(disableCompression))
.toMat(inboundSink(largeEnvelopeBufferPool))(Keep.right)
.run()(materializer)
if (settings.Advanced.TestMode)
testStages.add(testMgmt)
attachStreamRestart("Inbound large message stream", completed, () runInboundLargeMessagesStream())
}
@ -753,44 +770,59 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
association(remoteAddress).quarantine(reason = "", uid.map(_.toLong))
}
def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] =
createOutboundSink(ordinaryStreamId, outboundContext, envelopePool)
def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] =
createOutboundSink(largeStreamId, outboundContext, largeEnvelopePool)
createOutboundSink(largeStreamId, outboundContext, largeEnvelopeBufferPool)
.mapMaterializedValue { case (_, d) d }
def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] =
createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool)
private def createOutboundSink(streamId: Int, outboundContext: OutboundContext,
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.viaMat(createEncoder(bufferPool))(Keep.right)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
envelopePool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))(Keep.both)
outboundLane(outboundContext, bufferPool)
.toMat(aeronSink(outboundContext, streamId))(Keep.both)
}
/**
* The outbound stream is defined as two parts to be able to add test stage in-between.
* System messages must not be dropped before the SystemMessageDelivery stage.
*/
def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = {
def aeronSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] =
aeronSink(outboundContext, ordinaryStreamId)
private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = {
Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))
}
def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] =
outboundLane(outboundContext, envelopeBufferPool)
private def outboundLane(
outboundContext: OutboundContext,
bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.viaMat(createEncoder(bufferPool))(Keep.right)
}
def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (TestManagementApi, OutboundControlIngress, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval,
settings.Advanced.SysMsgBufferSize))
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}
def outboundControlPart2(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = {
Flow[OutboundEnvelope]
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
.via(createEncoder(envelopePool))
// note that System messages must not be dropped before the SystemMessageDelivery stage
.viaMat(outboundTestFlow(outboundContext))(Keep.right)
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.both)
.via(createEncoder(envelopeBufferPool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
envelopeBufferPool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
.mapMaterializedValue {
case ((a, b), c) (a, b, c)
}
// TODO we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}
private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions =
@ -819,34 +851,41 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
inboundEnvelopePool))
}
def decoder(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
createDecoder(compression, envelopePool)
def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Deserializer(this, system, bufferPool))
def inboundSink: Sink[InboundEnvelope, Future[Done]] =
def inboundSink(bufferPool: EnvelopeBufferPool): Sink[InboundEnvelope, (TestManagementApi, Future[Done])] =
Flow[InboundEnvelope]
.via(createDeserializer(bufferPool))
.viaMat(new InboundTestStage(this, settings.Advanced.TestMode))(Keep.right)
.via(new InboundHandshake(this, inControlStream = false))
.via(new InboundQuarantineCheck(this))
.toMat(messageDispatcherSink)(Keep.right)
.toMat(messageDispatcherSink)(Keep.both)
def inboundFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
Flow[EnvelopeBuffer]
.via(killSwitch.flow)
.via(decoder(compression))
.via(createDecoder(compression, envelopeBufferPool))
}
def inboundLargeFlow(compression: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
Flow[EnvelopeBuffer]
.via(killSwitch.flow)
.via(createDecoder(compression, largeEnvelopePool))
.via(createDecoder(compression, largeEnvelopeBufferPool))
}
def inboundControlSink: Sink[InboundEnvelope, (ControlMessageSubject, Future[Done])] = {
def inboundControlSink: Sink[InboundEnvelope, (TestManagementApi, ControlMessageSubject, Future[Done])] = {
Flow[InboundEnvelope]
.via(createDeserializer(envelopeBufferPool))
.viaMat(new InboundTestStage(this, settings.Advanced.TestMode))(Keep.right)
.via(new InboundHandshake(this, inControlStream = true))
.via(new InboundQuarantineCheck(this))
.viaMat(new InboundControlJunction)(Keep.right)
.viaMat(new InboundControlJunction)(Keep.both)
.via(new SystemMessageAcker(this))
.toMat(messageDispatcherSink)(Keep.both)
.mapMaterializedValue {
case ((a, b), c) (a, b, c)
}
}
private def initializeFlightRecorder(): Option[(FileChannel, File, FlightRecorder)] = {
@ -861,11 +900,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
None
}
def inboundTestFlow: Flow[InboundEnvelope, InboundEnvelope, TestManagementApi] =
Flow.fromGraph(new InboundTestStage(this))
def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] =
Flow.fromGraph(new OutboundTestStage(association))
def outboundTestFlow(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] =
Flow.fromGraph(new OutboundTestStage(outboundContext, settings.Advanced.TestMode))
/** INTERNAL API: for testing only. */
private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = {
@ -888,9 +924,6 @@ private[remote] object ArteryTransport {
val ProtocolName = "artery"
val Version = 0
val MaximumFrameSize = 1024 * 1024
val MaximumPooledBuffers = 256
val MaximumLargeFrameSize = MaximumFrameSize * 5
/**
* Internal API

View file

@ -42,6 +42,7 @@ import akka.util.{ Unsafe, WildcardIndex }
import akka.util.OptionVal
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.remote.artery.compress.CompressionProtocol._
import akka.stream.scaladsl.MergeHub
/**
* INTERNAL API
@ -50,6 +51,10 @@ private[remote] object Association {
final case class QueueWrapper(queue: Queue[OutboundEnvelope]) extends SendQueue.ProducerApi[OutboundEnvelope] {
override def offer(message: OutboundEnvelope): Boolean = queue.offer(message)
}
final val ControlQueueIndex = 0
final val LargeQueueIndex = 1
final val OrdinaryQueueIndex = 2
}
/**
@ -70,11 +75,6 @@ private[remote] class Association(
import Association._
private val log = Logging(transport.system, getClass.getName)
private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout)
@ -85,30 +85,57 @@ private[remote] class Association(
def createQueue(capacity: Int): Queue[OutboundEnvelope] =
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
@volatile private[this] var queue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(queueSize))
@volatile private[this] var largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(largeQueueSize))
@volatile private[this] var controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(controlQueueSize))
private val outboundLanes = transport.settings.Advanced.OutboundLanes
private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes)
queues(ControlQueueIndex) = QueueWrapper(createQueue(controlQueueSize)) // control stream
queues(LargeQueueIndex) = QueueWrapper(createQueue(largeQueueSize)) // large messages stream
(0 until outboundLanes).foreach { i
queues(OrdinaryQueueIndex + i) = QueueWrapper(createQueue(queueSize)) // ordinary messages stream
}
@volatile private[this] var queuesVisibility = false
private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex)
private def largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(LargeQueueIndex)
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
@volatile private[this] var materializing = new CountDownLatch(1)
@volatile private[this] var changeOutboundCompression: Option[ChangeOutboundCompression] = None
@volatile private[this] var changeOutboundCompression: Option[Vector[ChangeOutboundCompression]] = None
def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] =
def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = {
import transport.system.dispatcher
changeOutboundCompression match {
case Some(c) c.changeActorRefCompression(table)
case None Future.failed(new ChangeOutboundCompressionFailed)
case Some(c)
if (c.size == 1) c.head.changeActorRefCompression(table)
else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ Done)
case None Future.failed(new ChangeOutboundCompressionFailed)
}
}
def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] =
def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
import transport.system.dispatcher
changeOutboundCompression match {
case Some(c) c.changeClassManifestCompression(table)
case None Future.failed(new ChangeOutboundCompressionFailed)
case Some(c)
if (c.size == 1) c.head.changeClassManifestCompression(table)
else Future.sequence(c.map(_.changeClassManifestCompression(table))).map(_ Done)
case None Future.failed(new ChangeOutboundCompressionFailed)
}
}
def clearCompression(): Future[Done] =
def clearCompression(): Future[Done] = {
import transport.system.dispatcher
changeOutboundCompression match {
case Some(c) c.clearCompression()
case None Future.failed(new ChangeOutboundCompressionFailed)
case Some(c)
if (c.size == 1) c.head.clearCompression()
else Future.sequence(c.map(_.clearCompression())).map(_ Done)
case None Future.failed(new ChangeOutboundCompressionFailed)
}
}
private val _testStages: CopyOnWriteArrayList[TestManagementApi] = new CopyOnWriteArrayList
@ -201,6 +228,9 @@ private[remote] class Association(
def createOutboundEnvelope(): OutboundEnvelope =
outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)
// volatile read to see latest queue array
val unused = queuesVisibility
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
// FIXME where is that ActorSelectionMessage check in old remoting?
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
@ -219,9 +249,11 @@ private[remote] class Association(
val outboundEnvelope1 = createOutboundEnvelope()
if (!controlQueue.offer(outboundEnvelope1))
transport.system.deadLetters ! outboundEnvelope1
val outboundEnvelope2 = createOutboundEnvelope()
if (!queue.offer(outboundEnvelope2))
transport.system.deadLetters ! outboundEnvelope2
(0 until outboundLanes).foreach { i
val outboundEnvelope2 = createOutboundEnvelope()
if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2))
transport.system.deadLetters ! outboundEnvelope2
}
case _
val outboundEnvelope = createOutboundEnvelope()
val queue = selectQueue(recipient)
@ -233,30 +265,35 @@ private[remote] class Association(
log.debug("Dropping message to quarantined system {}", remoteAddress)
}
@tailrec
private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = {
recipient match {
case OptionVal.Some(r)
r.cachedMessageDestinationFlag match {
case RegularDestination queue
case PriorityDestination controlQueue
case LargeDestination largeQueue
case null
val queueIndex = r.cachedSendQueueIndex match {
case -1
// only happens when messages are sent to new remote destination
// and is then cached on the RemoteActorRef
val elements = r.path.elements
if (priorityMessageDestinations.find(elements).isDefined) {
log.debug("Using priority message stream for {}", r.path)
r.cachedMessageDestinationFlag = PriorityDestination
} else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) {
log.debug("Using large message stream for {}", r.path)
r.cachedMessageDestinationFlag = LargeDestination
} else {
r.cachedMessageDestinationFlag = RegularDestination
}
selectQueue(recipient)
val idx =
if (priorityMessageDestinations.find(elements).isDefined) {
log.debug("Using priority message stream for {}", r.path)
ControlQueueIndex
} else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) {
log.debug("Using large message stream for {}", r.path)
LargeQueueIndex
} else if (outboundLanes == 1) {
OrdinaryQueueIndex
} else {
// select lane based on destination, to preserve message order
OrdinaryQueueIndex + (math.abs(r.path.uid) % outboundLanes)
}
r.cachedSendQueueIndex = idx
idx
case idx idx
}
case OptionVal.None queue
queues(queueIndex)
case OptionVal.None
queues(OrdinaryQueueIndex)
}
}
@ -333,29 +370,22 @@ private[remote] class Association(
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
materializing = new CountDownLatch(1)
val wrapper = getOrCreateQueueWrapper(controlQueue, queueSize)
controlQueue = wrapper // use new underlying queue immediately for restarts
val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize)
queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val (queueValue, (control, completed)) =
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), (control, completed)) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
.via(transport.outboundControlPart1(this))
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outboundControlPart2(this))(Keep.both)
.run()(materializer)
_testStages.add(mgmt)
(queueValue, (control, completed))
} else {
Source.fromGraph(new SendQueue[OutboundEnvelope])
.via(transport.outboundControlPart1(this))
.toMat(transport.outboundControlPart2(this))(Keep.both)
.run()(materializer)
}
val (queueValue, (testMgmt, control, completed)) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
controlQueue = queueValue
queues(ControlQueueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
_outboundControlIngress = control
materializing.countDown()
attachStreamRestart("Outbound control stream", completed, cause {
@ -367,61 +397,103 @@ private[remote] class Association(
})
}
private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[OutboundEnvelope], capacity: Int): QueueWrapper =
q match {
private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = {
val unused = queuesVisibility // volatile read to see latest queues array
queues(queueIndex) match {
case existing: QueueWrapper existing
case _
// use new queue for restarts
QueueWrapper(createQueue(capacity))
}
}
private def runOutboundOrdinaryMessagesStream(): Unit = {
val wrapper = getOrCreateQueueWrapper(queue, queueSize)
queue = wrapper // use new underlying queue immediately for restarts
if (outboundLanes == 1) {
val queueIndex = OrdinaryQueueIndex
val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize)
queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val (queueValue, (changeCompression, completed)) =
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
val ((queueValue, testMgmt), (changeCompression, completed)) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outbound(this))(Keep.both)
.run()(materializer)
_testStages.add(mgmt)
(queueValue, completed)
} else {
Source.fromGraph(new SendQueue[OutboundEnvelope])
.toMat(transport.outbound(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queues(queueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
changeOutboundCompression = Some(Vector(changeCompression))
attachStreamRestart("Outbound message stream", completed, _ runOutboundOrdinaryMessagesStream())
} else {
val wrappers = (0 until outboundLanes).map { i
val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize)
queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
wrapper
}.toVector
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.viaMat(transport.outboundLane(this))(Keep.both)
.watchTermination()(Keep.both)
.mapMaterializedValue {
case (((q, m), c), w) ((q, m), (c, w))
}
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer].toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
val values: Vector[((SendQueue.QueueValue[OutboundEnvelope], TestManagementApi), (Encoder.ChangeOutboundCompression, Future[Done]))] =
(0 until outboundLanes).map { _
lane.to(mergeHub).run()(materializer)
}(collection.breakOut)
val (a, b) = values.unzip
val (queueValues, testMgmtValues) = a.unzip
val (changeCompressionValues, laneCompletedValues) = b.unzip
if (transport.settings.Advanced.TestMode)
testMgmtValues.foreach(_testStages.add)
import transport.system.dispatcher
val completed = Future.sequence(laneCompletedValues).flatMap(_ aeronSinkCompleted)
queueValues.zip(wrappers).zipWithIndex.foreach {
case ((q, w), i)
q.inject(w.queue)
queues(OrdinaryQueueIndex + i) = q // replace with the materialized value, still same underlying queue
}
queuesVisibility = true // volatile write for visibility of the queues array
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queue = queueValue
changeOutboundCompression = Some(changeCompression)
changeOutboundCompression = Some(changeCompressionValues)
attachStreamRestart("Outbound message stream", completed, _ runOutboundOrdinaryMessagesStream())
attachStreamRestart("Outbound message stream", completed, _ runOutboundOrdinaryMessagesStream())
}
}
private def runOutboundLargeMessagesStream(): Unit = {
val wrapper = getOrCreateQueueWrapper(largeQueue, largeQueueSize)
largeQueue = wrapper // use new underlying queue immediately for restarts
val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize)
queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val (queueValue, completed) =
if (transport.settings.Advanced.TestMode) {
val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
_testStages.add(mgmt)
(queueValue, completed)
} else {
Source.fromGraph(new SendQueue[OutboundEnvelope])
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
}
val ((queueValue, testMgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
largeQueue = queueValue
queues(LargeQueueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
attachStreamRestart("Outbound large message stream", completed, _ runOutboundLargeMessagesStream())
}

View file

@ -21,6 +21,7 @@ import akka.remote.artery.compress.CompressionTable
import akka.Done
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.concurrent.Promise
import java.util.concurrent.atomic.AtomicInteger
/**
* INTERNAL API
@ -34,6 +35,7 @@ private[remote] object Encoder {
private[remote] class ChangeOutboundCompressionFailed extends RuntimeException(
"Change of outbound compression table failed (will be retried), because materialization did not complete yet")
}
/**
@ -212,7 +214,6 @@ private[remote] class Decoder(
import Decoder.RetryResolveRemoteDeployedRecipient
private val localAddress = inboundContext.localAddress.address
private val headerBuilder = HeaderBuilder.in(compression)
private val serialization = SerializationExtension(system)
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
@ -284,35 +285,24 @@ private[remote] class Decoder(
// --- end of hit refs and manifests for heavy-hitter counting
}
try {
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, originUid, serialization, headerBuilder.serializer, classManifest, envelope)
val decoded = inEnvelopePool.acquire().init(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
sender,
originUid,
headerBuilder.serializer,
classManifest,
envelope,
association)
val decoded = inEnvelopePool.acquire().init(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage,
sender, // FIXME: No need for an option, decode simply to deadLetters instead
originUid,
association)
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
// the remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry
scheduleOnce(RetryResolveRemoteDeployedRecipient(
retryResolveRemoteDeployedRecipientAttempts,
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE?
} else
push(out, decoded)
} catch {
case NonFatal(e)
log.warning(
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
headerBuilder.serializer, classManifest, e.getMessage)
pull(in)
} finally {
bufferPool.release(envelope)
}
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
// the remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry
scheduleOnce(RetryResolveRemoteDeployedRecipient(
retryResolveRemoteDeployedRecipientAttempts,
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE?
} else
push(out, decoded)
}
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
@ -369,3 +359,48 @@ private[remote] class Decoder(
}
}
/**
* INTERNAL API
*/
private[remote] class Deserializer(
inboundContext: InboundContext,
system: ExtendedActorSystem,
bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
val in: Inlet[InboundEnvelope] = Inlet("Artery.Deserializer.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Deserializer.out")
val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val serialization = SerializationExtension(system)
override protected def logSource = classOf[Deserializer]
override def onPush(): Unit = {
val envelope = grab(in)
try {
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer)
push(out, envelope.withMessage(deserializedMessage))
} catch {
case NonFatal(e)
log.warning(
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
envelope.serializer, envelope.classManifest, e.getMessage)
pull(in)
} finally {
val buf = envelope.envelopeBuffer
envelope.releaseEnvelopeBuffer()
bufferPool.release(buf)
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}

View file

@ -11,7 +11,10 @@ import akka.actor.ActorRef
/**
* INTERNAL API
*/
private[akka] object InboundEnvelope {
private[remote] object InboundEnvelope {
/**
* Only used in tests
*/
def apply(
recipient: OptionVal[InternalActorRef],
recipientAddress: Address,
@ -20,7 +23,8 @@ private[akka] object InboundEnvelope {
originUid: Long,
association: OptionVal[OutboundContext]): InboundEnvelope = {
val env = new ReusableInboundEnvelope
env.init(recipient, recipientAddress, message, sender, originUid, association)
env.init(recipient, recipientAddress, sender, originUid, -1, "", null, association)
.withMessage(message)
}
}
@ -28,23 +32,29 @@ private[akka] object InboundEnvelope {
/**
* INTERNAL API
*/
private[akka] trait InboundEnvelope {
private[remote] trait InboundEnvelope {
def recipient: OptionVal[InternalActorRef]
def recipientAddress: Address
def message: AnyRef
def sender: OptionVal[ActorRef]
def originUid: Long
def association: OptionVal[OutboundContext]
def serializer: Int
def classManifest: String
def message: AnyRef
def envelopeBuffer: EnvelopeBuffer
def withMessage(message: AnyRef): InboundEnvelope
def releaseEnvelopeBuffer(): InboundEnvelope
def withRecipient(ref: InternalActorRef): InboundEnvelope
}
/**
* INTERNAL API
*/
private[akka] object ReusableInboundEnvelope {
private[remote] object ReusableInboundEnvelope {
def createObjectPool(capacity: Int) = new ObjectPool[ReusableInboundEnvelope](
capacity,
create = () new ReusableInboundEnvelope, clear = inEnvelope inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear())
@ -56,23 +66,34 @@ private[akka] object ReusableInboundEnvelope {
private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
private var _recipient: OptionVal[InternalActorRef] = OptionVal.None
private var _recipientAddress: Address = null
private var _message: AnyRef = null
private var _sender: OptionVal[ActorRef] = OptionVal.None
private var _originUid: Long = 0L
private var _association: OptionVal[OutboundContext] = OptionVal.None
private var _serializer: Int = -1
private var _classManifest: String = null
private var _message: AnyRef = null
private var _envelopeBuffer: EnvelopeBuffer = null
override def recipient: OptionVal[InternalActorRef] = _recipient
override def recipientAddress: Address = _recipientAddress
override def message: AnyRef = _message
override def sender: OptionVal[ActorRef] = _sender
override def originUid: Long = _originUid
override def association: OptionVal[OutboundContext] = _association
override def serializer: Int = _serializer
override def classManifest: String = _classManifest
override def message: AnyRef = _message
override def envelopeBuffer: EnvelopeBuffer = _envelopeBuffer
override def withMessage(message: AnyRef): InboundEnvelope = {
_message = message
this
}
def releaseEnvelopeBuffer(): InboundEnvelope = {
_envelopeBuffer = null
this
}
def withRecipient(ref: InternalActorRef): InboundEnvelope = {
_recipient = OptionVal(ref)
this
@ -90,15 +111,19 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
def init(
recipient: OptionVal[InternalActorRef],
recipientAddress: Address,
message: AnyRef,
sender: OptionVal[ActorRef],
originUid: Long,
serializer: Int,
classManifest: String,
envelopeBuffer: EnvelopeBuffer,
association: OptionVal[OutboundContext]): InboundEnvelope = {
_recipient = recipient
_recipientAddress = recipientAddress
_message = message
_sender = sender
_originUid = originUid
_serializer = serializer
_classManifest = classManifest
_envelopeBuffer = envelopeBuffer
_association = association
this
}

View file

@ -24,6 +24,7 @@ import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.util.OptionVal
import akka.stream.stage.GraphStageLogic
/**
* INTERNAL API
@ -49,6 +50,11 @@ private[remote] class TestManagementApiImpl(stopped: Future[Done], callback: Asy
}
}
private[remote] class DisabledTestManagementApi extends TestManagementApi {
override def send(command: Any)(implicit ec: ExecutionContext): Future[Done] =
Future.failed(new RuntimeException("TestStage is disabled, enable with MultiNodeConfig.testTransport"))
}
/**
* INTERNAL API
*/
@ -57,61 +63,70 @@ private[remote] final case class TestManagementMessage(command: Any, done: Promi
/**
* INTERNAL API
*/
private[remote] class OutboundTestStage(outboundContext: OutboundContext)
private[remote] class OutboundTestStage(outboundContext: OutboundContext, enabled: Boolean)
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], TestManagementApi] {
val in: Inlet[OutboundEnvelope] = Inlet("OutboundTestStage.in")
val out: Outlet[OutboundEnvelope] = Outlet("OutboundTestStage.out")
override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stoppedPromise = Promise[Done]()
if (enabled) {
val stoppedPromise = Promise[Done]()
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging {
private var blackhole = Set.empty[Address]
private var blackhole = Set.empty[Address]
private val callback = getAsyncCallback[TestManagementMessage] {
case TestManagementMessage(command, done)
command match {
case SetThrottle(address, Direction.Send | Direction.Both, Blackhole)
log.info("blackhole outbound messages to {}", address)
blackhole += address
case SetThrottle(address, Direction.Send | Direction.Both, Unthrottled)
log.info("accept outbound messages to {}", address)
blackhole -= address
case _ // not interested
}
done.success(Done)
private val callback = getAsyncCallback[TestManagementMessage] {
case TestManagementMessage(command, done)
command match {
case SetThrottle(address, Direction.Send | Direction.Both, Blackhole)
log.info("blackhole outbound messages to {}", address)
blackhole += address
case SetThrottle(address, Direction.Send | Direction.Both, Unthrottled)
log.info("accept outbound messages to {}", address)
blackhole -= address
case _ // not interested
}
done.success(Done)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
}
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
val env = grab(in)
if (blackhole(outboundContext.remoteAddress)) {
log.debug(
"dropping outbound message [{}] to [{}] because of blackhole",
env.message.getClass.getName, outboundContext.remoteAddress)
pull(in) // drop message
} else
push(out, env)
}
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic)
(logic, managementApi)
} else {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
val env = grab(in)
if (blackhole(outboundContext.remoteAddress)) {
log.debug(
"dropping outbound message [{}] to [{}] because of blackhole",
env.message.getClass.getName, outboundContext.remoteAddress)
pull(in) // drop message
} else
push(out, env)
}
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
(logic, new DisabledTestManagementApi)
}
val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic)
(logic, managementApi)
}
}
@ -119,67 +134,76 @@ private[remote] class OutboundTestStage(outboundContext: OutboundContext)
/**
* INTERNAL API
*/
private[remote] class InboundTestStage(inboundContext: InboundContext)
private[remote] class InboundTestStage(inboundContext: InboundContext, enabled: Boolean)
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], TestManagementApi] {
val in: Inlet[InboundEnvelope] = Inlet("InboundTestStage.in")
val out: Outlet[InboundEnvelope] = Outlet("InboundTestStage.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stoppedPromise = Promise[Done]()
if (enabled) {
val stoppedPromise = Promise[Done]()
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new TimerGraphStageLogic(shape) with CallbackWrapper[TestManagementMessage] with InHandler with OutHandler with StageLogging {
private var blackhole = Set.empty[Address]
private var blackhole = Set.empty[Address]
private val callback = getAsyncCallback[TestManagementMessage] {
case TestManagementMessage(command, done)
command match {
case SetThrottle(address, Direction.Receive | Direction.Both, Blackhole)
log.info("blackhole inbound messages from {}", address)
blackhole += address
case SetThrottle(address, Direction.Receive | Direction.Both, Unthrottled)
log.info("accept inbound messages from {}", address)
blackhole -= address
case _ // not interested
}
done.success(Done)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
}
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
val env = grab(in)
env.association match {
case OptionVal.None
// unknown, handshake not completed
push(out, env)
case OptionVal.Some(association)
if (blackhole(association.remoteAddress)) {
log.debug(
"dropping inbound message [{}] from [{}] with UID [{}] because of blackhole",
env.message.getClass.getName, association.remoteAddress, env.originUid)
pull(in) // drop message
} else
push(out, env)
private val callback = getAsyncCallback[TestManagementMessage] {
case TestManagementMessage(command, done)
command match {
case SetThrottle(address, Direction.Receive | Direction.Both, Blackhole)
log.info("blackhole inbound messages from {}", address)
blackhole += address
case SetThrottle(address, Direction.Receive | Direction.Both, Unthrottled)
log.info("accept inbound messages from {}", address)
blackhole -= address
case _ // not interested
}
done.success(Done)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
}
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
val env = grab(in)
env.association match {
case OptionVal.None
// unknown, handshake not completed
push(out, env)
case OptionVal.Some(association)
if (blackhole(association.remoteAddress)) {
log.debug(
"dropping inbound message [{}] from [{}] with UID [{}] because of blackhole",
env.message.getClass.getName, association.remoteAddress, env.originUid)
pull(in) // drop message
} else
push(out, env)
}
}
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
// OutHandler
override def onPull(): Unit = pull(in)
val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic)
setHandlers(in, out, this)
(logic, managementApi)
} else {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
(logic, new DisabledTestManagementApi)
}
val managementApi: TestManagementApi = new TestManagementApiImpl(stoppedPromise.future, logic)
(logic, managementApi)
}
}

View file

@ -39,7 +39,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender {
r
}
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val pool = new EnvelopeBufferPool(1034 * 1024, 128)
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)

View file

@ -10,6 +10,7 @@ import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.testkit.SocketUtil
import akka.testkit.TestActors
import com.typesafe.config.ConfigFactory
import akka.testkit.TestProbe
object HandshakeFailureSpec {
@ -22,7 +23,7 @@ object HandshakeFailureSpec {
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
remote.handshake-timeout = 2s
remote.artery.advanced.handshake-timeout = 2s
}
""")

View file

@ -4,7 +4,7 @@
package akka.remote.artery
import akka.actor.{ Actor, ActorRef, ActorSelection, Props, RootActorPath }
import akka.remote.{ LargeDestination, RARP, RegularDestination, RemoteActorRef }
import akka.remote.{ RARP, RemoteActorRef }
import akka.testkit.TestProbe
import akka.util.ByteString
@ -51,7 +51,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
senderProbeA.expectMsg(Pong(0))
// flag should be cached now
regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination)
regularRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should be >= (Association.OrdinaryQueueIndex)
}
@ -75,7 +75,7 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
senderProbeA.expectMsg(Pong(0))
// flag should be cached now
largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination)
largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex)
}
@ -112,8 +112,8 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
remoteProbe.expectMsg(10.seconds, Pong(largeBytes))
// cached flags should be set now
largeRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(LargeDestination)
regularRemote.asInstanceOf[RemoteActorRef].cachedMessageDestinationFlag should ===(RegularDestination)
largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex)
regularRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should be >= (Association.OrdinaryQueueIndex)
}
}

View file

@ -21,7 +21,6 @@ object RemoteMessageSerializationSpec {
case s if sender().path == another.path one ! s
}
}
val maxPayloadBytes = ArteryTransport.MaximumFrameSize
}
class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec("""
@ -31,6 +30,8 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec("""
import RemoteMessageSerializationSpec._
val maxPayloadBytes = RARP(system).provider.remoteSettings.Artery.Advanced.MaximumFrameSize
val remoteSystem = newRemoteSystem()
val remotePort = port(remoteSystem)

View file

@ -12,6 +12,8 @@ import akka.remote.RARP
import akka.testkit.TestActors
import akka.actor.PoisonPill
import akka.testkit.TestProbe
import akka.actor.ActorRef
import com.typesafe.config.Config
object RemoteSendConsistencySpec {
@ -26,7 +28,15 @@ object RemoteSendConsistencySpec {
}
class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.config) with ImplicitSender {
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(RemoteSendConsistencySpec.config)
class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
ConfigFactory.parseString("""
akka.remote.artery.advanced.outbound-lanes = 3
akka.remote.artery.advanced.inbound-lanes = 3
""").withFallback(RemoteSendConsistencySpec.config))
abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
val systemB = ActorSystem("systemB", system.settings.config)
val addressB = RARP(systemB).provider.getDefaultAddress
@ -78,18 +88,24 @@ class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.confi
}
"be able to send messages concurrently preserving order" in {
val actorOnSystemB = systemB.actorOf(Props(new Actor {
def receive = {
case i: Int sender() ! i
}
}), "echo2")
systemB.actorOf(TestActors.echoActorProps, "echoA")
systemB.actorOf(TestActors.echoActorProps, "echoB")
systemB.actorOf(TestActors.echoActorProps, "echoC")
val remoteRef = {
system.actorSelection(rootB / "user" / "echo2") ! Identify(None)
val remoteRefA = {
system.actorSelection(rootB / "user" / "echoA") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val remoteRefB = {
system.actorSelection(rootB / "user" / "echoB") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val remoteRefC = {
system.actorSelection(rootB / "user" / "echoC") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val senderProps = Props(new Actor {
def senderProps(remoteRef: ActorRef) = Props(new Actor {
var counter = 1000
remoteRef ! counter
@ -106,10 +122,10 @@ class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.confi
}
}).withDeploy(Deploy.local)
system.actorOf(senderProps)
system.actorOf(senderProps)
system.actorOf(senderProps)
system.actorOf(senderProps)
system.actorOf(senderProps(remoteRefA))
system.actorOf(senderProps(remoteRefB))
system.actorOf(senderProps(remoteRefC))
system.actorOf(senderProps(remoteRefA))
within(10.seconds) {
expectMsg("success")

View file

@ -20,7 +20,7 @@ import akka.util.OptionVal
import akka.actor.InternalActorRef
import akka.dispatch.ExecutionContexts
private[akka] class TestInboundContext(
private[remote] class TestInboundContext(
override val localAddress: UniqueAddress,
val controlSubject: TestControlMessageSubject = new TestControlMessageSubject,
val controlProbe: Option[ActorRef] = None,
@ -61,7 +61,7 @@ private[akka] class TestInboundContext(
new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe)
}
private[akka] class TestOutboundContext(
private[remote] class TestOutboundContext(
override val localAddress: UniqueAddress,
override val remoteAddress: Address,
override val controlSubject: TestControlMessageSubject,
@ -96,7 +96,7 @@ private[akka] class TestOutboundContext(
}
private[akka] class TestControlMessageSubject extends ControlMessageSubject {
private[remote] class TestControlMessageSubject extends ControlMessageSubject {
private val observers = new CopyOnWriteArrayList[ControlMessageObserver]
@ -119,7 +119,7 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject {
}
private[akka] class ManualReplyInboundContext(
private[remote] class ManualReplyInboundContext(
replyProbe: ActorRef,
localAddress: UniqueAddress,
controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) {

View file

@ -28,7 +28,7 @@ object CompressionIntegrationSpec {
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
remote.handshake-timeout = 10s
remote.artery.advanced.handshake-timeout = 10s
remote.artery.advanced.compression {
actor-refs.advertisement-interval = 3 seconds

View file

@ -30,7 +30,7 @@ object HandshakeShouldDropCompressionTableSpec {
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
remote.handshake-timeout = 10s
remote.artery.advanced.handshake-timeout = 10s
remote.artery.advanced.compression {
actor-refs {