diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 0454c0e1c1..e7e4233284 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -19,7 +19,6 @@ import scala.concurrent.duration._ import akka.NotUsed import akka.actor._ import akka.remote.AddressUidExtension -import akka.remote.EndpointManager.Send import akka.remote.RARP import akka.remote.RemoteActorRef import akka.remote.UniqueAddress @@ -54,11 +53,9 @@ class CodecBenchmark { implicit val system = ActorSystem("CodecBenchmark", config) val systemB = ActorSystem("systemB", system.settings.config) - val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) - val inboundEnvelopePool = new ObjectPool[InboundEnvelope]( - 16, - create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear() - ) + private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) val compressionOut = NoOutboundCompressions val headerIn = HeaderBuilder.in(NoopInboundCompressions) @@ -139,11 +136,11 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) - .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) + .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) .via(encoder) .map(envelope => envelopePool.release(envelope)) .runWith(new LatchSink(N, latch))(materializer) @@ -180,9 +177,9 @@ class CodecBenchmark { envelope } .via(decoder) - .map { env => - inboundEnvelopePool.release(env) - () + .map { + case env: ReusableInboundEnvelope => inboundEnvelopePool.release(env) + case _ => } .runWith(new LatchSink(N, latch))(materializer) @@ -196,8 +193,8 @@ class CodecBenchmark { val latch = new CountDownLatch(1) val N = 100000 - val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, envelopePool)) + val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compressionOut, outboundEnvelopePool, envelopePool)) val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) val provider = RARP(system).provider @@ -213,12 +210,12 @@ class CodecBenchmark { resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) - .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) + .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) .via(encoder) .via(decoder) - .map { env => - inboundEnvelopePool.release(env) - () + .map { + case env: ReusableInboundEnvelope => inboundEnvelopePool.release(env) + case _ => } .runWith(new LatchSink(N, latch))(materializer) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/PerfFlamesSupport.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/PerfFlamesSupport.scala index 34311bd1b1..3ca461388c 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/PerfFlamesSupport.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/PerfFlamesSupport.scala @@ -16,10 +16,10 @@ import scala.concurrent.duration._ */ private[akka] trait PerfFlamesSupport { _: MultiNodeSpec ⇒ - /** + /** * Runs `perf-java-flames` script on given node (JVM process). * Refer to https://github.com/jrudolph/perf-map-agent for options and manual. - * + * * Options are currently to be passed in via `export PERF_MAP_OPTIONS` etc. */ def runPerfFlames(nodes: RoleName*)(delay: FiniteDuration, time: FiniteDuration = 15.seconds): Unit = { @@ -53,6 +53,5 @@ private[akka] trait PerfFlamesSupport { _: MultiNodeSpec ⇒ if (!isIt) println(s"WARN: perf-java-flames not available under [$perfJavaFlamesPath]! Skipping perf profiling.") isIt } - } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 121a166dc6..2bdedd2681 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -205,7 +205,7 @@ class MaxThroughputSpecMultiJvmNode2 extends MaxThroughputSpec abstract class MaxThroughputSpec extends MultiNodeSpec(MaxThroughputSpec) - with STMultiNodeSpec with ImplicitSender + with STMultiNodeSpec with ImplicitSender with PerfFlamesSupport { import MaxThroughputSpec._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 9561ac1d55..ac83fc7d54 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -19,6 +19,7 @@ import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport import akka.util.OptionVal +import akka.remote.artery.OutboundEnvelope /** * INTERNAL API @@ -96,6 +97,10 @@ private[akka] object RemoteActorRefProvider { // else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved // the dead letter status if (seqOpt.isEmpty) super.!(m)(senderOption.orNull) + case env: OutboundEnvelope ⇒ + super.!(env.message)(env.sender.orNull) + case DeadLetter(env: OutboundEnvelope, _, _) ⇒ + super.!(env.message)(env.sender.orNull) case _ ⇒ super.!(message)(sender) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 800d1fd99d..5daec306a9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -21,7 +21,6 @@ import akka.actor.Cancellable import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension -import akka.remote.EndpointManager.Send import akka.remote.EventPublisher import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider @@ -72,96 +71,6 @@ import scala.util.control.NonFatal import akka.actor.Props import akka.actor.Actor -/** - * INTERNAL API - */ -private[akka] object InboundEnvelope { - def apply( - recipient: OptionVal[InternalActorRef], - recipientAddress: Address, - message: AnyRef, - sender: OptionVal[ActorRef], - originUid: Long, - association: OptionVal[OutboundContext]): InboundEnvelope = { - val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, message, sender, originUid, association) - env - } - -} - -/** - * INTERNAL API - */ -private[akka] trait InboundEnvelope { - def recipient: OptionVal[InternalActorRef] - def recipientAddress: Address - def message: AnyRef - def sender: OptionVal[ActorRef] - def originUid: Long - def association: OptionVal[OutboundContext] - - def withMessage(message: AnyRef): InboundEnvelope - - def withRecipient(ref: InternalActorRef): InboundEnvelope -} - -/** - * INTERNAL API - */ -private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { - private var _recipient: OptionVal[InternalActorRef] = OptionVal.None - private var _recipientAddress: Address = null - private var _message: AnyRef = null - private var _sender: OptionVal[ActorRef] = OptionVal.None - private var _originUid: Long = 0L - private var _association: OptionVal[OutboundContext] = OptionVal.None - - override def recipient: OptionVal[InternalActorRef] = _recipient - override def recipientAddress: Address = _recipientAddress - override def message: AnyRef = _message - override def sender: OptionVal[ActorRef] = _sender - override def originUid: Long = _originUid - override def association: OptionVal[OutboundContext] = _association - - override def withMessage(message: AnyRef): InboundEnvelope = { - _message = message - this - } - - def withRecipient(ref: InternalActorRef): InboundEnvelope = { - _recipient = OptionVal(ref) - this - } - - def clear(): Unit = { - _recipient = OptionVal.None - _recipientAddress = null - _message = null - _sender = OptionVal.None - _originUid = 0L - _association = OptionVal.None - } - - def init( - recipient: OptionVal[InternalActorRef], - recipientAddress: Address, - message: AnyRef, - sender: OptionVal[ActorRef], - originUid: Long, - association: OptionVal[OutboundContext]): Unit = { - _recipient = recipient - _recipientAddress = recipientAddress - _message = message - _sender = sender - _originUid = originUid - _association = association - } - - override def toString: String = - s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)" -} - /** * INTERNAL API * Inbound API that is used by the stream stages. @@ -311,8 +220,6 @@ private[akka] trait OutboundContext { */ def controlSubject: ControlMessageSubject - // FIXME we should be able to Send without a recipient ActorRef - def dummyRecipient: RemoteActorRef } /** @@ -340,7 +247,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati override def preStart(): Unit = { val msg = ActorSystemTerminating(inboundContext.localAddress) - associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), a.dummyRecipient) } + associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), OptionVal.None) } } override def postStop(): Unit = @@ -423,9 +330,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) private val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) - private val inboundEnvelopePool = new ObjectPool[InboundEnvelope]( - 16, - create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear()) + private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) + // FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity + // times a factor (for reasonable number of outbound streams) + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 3072 * 2) val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() match { case None ⇒ (None, None, None) @@ -447,7 +355,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createFlightRecorderEventSink(synchr = true) private val associationRegistry = new AssociationRegistry( - remoteAddress ⇒ new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations)) + remoteAddress ⇒ new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations, + outboundEnvelopePool)) def remoteSettings: RemoteSettings = provider.remoteSettings @@ -789,7 +698,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R a2 } - a.send(message, sender, recipient) + a.send(message, sender, OptionVal.Some(recipient)) } override def association(remoteAddress: Address): Association = { @@ -813,28 +722,31 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) } - def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { - Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + def outbound(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = { + Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, + handshakeRetryInterval, injectHandshakeInterval)) .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } - def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { - Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, Future[Done]] = { + Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, + handshakeRetryInterval, injectHandshakeInterval)) .via(createEncoder(largeEnvelopePool, compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } - def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, (OutboundControlIngress, Future[Done])] = { - Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) + def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { + Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, + handshakeRetryInterval, injectHandshakeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) - .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) + .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) @@ -842,17 +754,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, bufferPool)) + def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) else new NoInboundCompressions(system) - def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(localAddress, system, compression, pool)) + def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) - def encoder(compression: OutboundCompressions): Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) + def encoder(compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool, compression) def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, @@ -860,7 +772,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) - inboundEnvelopePool.release(m) + m match { + case r: ReusableInboundEnvelope ⇒ inboundEnvelopePool.release(r) + case _ ⇒ + } } def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { @@ -915,7 +830,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def inboundTestFlow: Flow[InboundEnvelope, InboundEnvelope, TestManagementApi] = Flow.fromGraph(new InboundTestStage(this)) - def outboundTestFlow(association: Association): Flow[Send, Send, TestManagementApi] = + def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] = Flow.fromGraph(new OutboundTestStage(association)) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index af5660fd21..affceabd77 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -24,7 +24,6 @@ import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress } -import akka.remote.EndpointManager.Send import akka.remote.artery.AeronSink.GaveUpSendingException import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress @@ -45,8 +44,8 @@ import akka.remote.artery.compress.CompressionProtocol._ * INTERNAL API */ private[remote] object Association { - final case class QueueWrapper(queue: Queue[Send]) extends SendQueue.ProducerApi[Send] { - override def offer(message: Send): Boolean = queue.offer(message) + final case class QueueWrapper(queue: Queue[OutboundEnvelope]) extends SendQueue.ProducerApi[OutboundEnvelope] { + override def offer(message: OutboundEnvelope): Boolean = queue.offer(message) } } @@ -61,7 +60,8 @@ private[remote] class Association( val materializer: Materializer, override val remoteAddress: Address, override val controlSubject: ControlMessageSubject, - largeMessageDestinations: WildcardTree[NotUsed]) + largeMessageDestinations: WildcardTree[NotUsed], + outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) extends AbstractAssociation with OutboundContext { import Association._ @@ -84,12 +84,12 @@ private[remote] class Association( /** Accesses the currently active outbound compression. */ def compression: OutboundCompressions = associationState.outboundCompression - def createQueue(capacity: Int): Queue[Send] = - new ManyToOneConcurrentArrayQueue[Send](capacity) + def createQueue(capacity: Int): Queue[OutboundEnvelope] = + new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) - @volatile private[this] var queue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(queueSize)) - @volatile private[this] var largeQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(largeQueueSize)) - @volatile private[this] var controlQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(controlQueueSize)) + @volatile private[this] var queue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(queueSize)) + @volatile private[this] var largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(largeQueueSize)) + @volatile private[this] var controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = QueueWrapper(createQueue(controlQueueSize)) @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) @@ -165,48 +165,51 @@ private[remote] class Association( override def sendControl(message: ControlMessage): Unit = outboundControlIngress.sendControlMessage(message) - def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { + def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = { + def createOutboundEnvelope(): OutboundEnvelope = + outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender) + // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { - // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage ⇒ - val send = Send(message, sender, recipient, None) - if (!controlQueue.offer(send)) { + val outboundEnvelope = createOutboundEnvelope() + if (!controlQueue.offer(createOutboundEnvelope())) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") - transport.system.deadLetters ! send + transport.system.deadLetters ! outboundEnvelope } case _: DaemonMsgCreate ⇒ // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages. // It must also be sent over the ordinary message stream so that it arrives (and creates the // destination) before the first ordinary message arrives. - val send1 = Send(message, sender, recipient, None) - if (!controlQueue.offer(send1)) - transport.system.deadLetters ! send1 - val send2 = Send(message, sender, recipient, None) - if (!queue.offer(send2)) - transport.system.deadLetters ! send2 + val outboundEnvelope1 = createOutboundEnvelope() + if (!controlQueue.offer(outboundEnvelope1)) + transport.system.deadLetters ! outboundEnvelope1 + val outboundEnvelope2 = createOutboundEnvelope() + if (!queue.offer(outboundEnvelope2)) + transport.system.deadLetters ! outboundEnvelope2 case _ ⇒ - val send = Send(message, sender, recipient, None) + val outboundEnvelope = createOutboundEnvelope() val offerOk = if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) - largeQueue.offer(send) + largeQueue.offer(outboundEnvelope) else - queue.offer(send) + queue.offer(outboundEnvelope) if (!offerOk) - transport.system.deadLetters ! send + transport.system.deadLetters ! outboundEnvelope } } else if (log.isDebugEnabled) log.debug("Dropping message to quarantined system {}", remoteAddress) } - private def isLargeMessageDestination(recipient: ActorRef): Boolean = { + private def isLargeMessageDestination(recipient: OptionVal[RemoteActorRef]): Boolean = { recipient match { - case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null ⇒ r.cachedLargeMessageDestinationFlag eq LargeDestination - case r: RemoteActorRef ⇒ - if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) { + case OptionVal.Some(r) ⇒ + if (r.cachedLargeMessageDestinationFlag ne null) + r.cachedLargeMessageDestinationFlag eq LargeDestination + else if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) { r.cachedLargeMessageDestinationFlag = RegularDestination false } else { @@ -214,17 +217,10 @@ private[remote] class Association( r.cachedLargeMessageDestinationFlag = LargeDestination true } - case _ ⇒ false + case OptionVal.None ⇒ false } } - // FIXME we should be able to Send without a recipient ActorRef - override val dummyRecipient: RemoteActorRef = - try transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] - catch { - case ex: Exception ⇒ throw new Exception("Bad dummy recipient! RemoteAddress: " + remoteAddress, ex) - } - // OutboundContext override def quarantine(reason: String): Unit = { val uid = associationState.uniqueRemoteAddressValue().map(_.uid) @@ -247,7 +243,7 @@ private[remote] class Association( // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt)) // end delivery of system messages to that incarnation after this point - send(ClearSystemMessageDelivery, OptionVal.None, dummyRecipient) + send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None) // try to tell the other system that we have quarantined it sendControl(Quarantined(localAddress, peer)) } else @@ -306,14 +302,14 @@ private[remote] class Association( val (queueValue, (control, completed)) = if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), (control, completed)) = - Source.fromGraph(new SendQueue[Send]) + Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outboundControl(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, (control, completed)) } else { - Source.fromGraph(new SendQueue[Send]) + Source.fromGraph(new SendQueue[OutboundEnvelope]) .toMat(transport.outboundControl(this, compression))(Keep.both) .run()(materializer) } @@ -332,7 +328,7 @@ private[remote] class Association( }) } - private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[Send], capacity: Int): QueueWrapper = + private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[OutboundEnvelope], capacity: Int): QueueWrapper = q match { case existing: QueueWrapper ⇒ existing case _ ⇒ @@ -346,14 +342,14 @@ private[remote] class Association( val (queueValue, completed) = if (transport.remoteSettings.TestMode) { - val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[Send]) + val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { - Source.fromGraph(new SendQueue[Send]) + Source.fromGraph(new SendQueue[OutboundEnvelope]) .toMat(transport.outbound(this, compression))(Keep.both) .run()(materializer) } @@ -371,14 +367,14 @@ private[remote] class Association( val (queueValue, completed) = if (transport.remoteSettings.TestMode) { - val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[Send]) + val ((queueValue, mgmt), completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outboundLarge(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, completed) } else { - Source.fromGraph(new SendQueue[Send]) + Source.fromGraph(new SendQueue[OutboundEnvelope]) .toMat(transport.outboundLarge(this, compression))(Keep.both) .run()(materializer) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 03577e289d..8e0d5d70ef 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -8,7 +8,6 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor._ import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress } -import akka.remote.EndpointManager.Send import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ @@ -21,15 +20,16 @@ import akka.stream.stage.TimerGraphStageLogic * INTERNAL API */ private[remote] class Encoder( - uniqueLocalAddress: UniqueAddress, - system: ActorSystem, - compression: OutboundCompressions, - bufferPool: EnvelopeBufferPool) - extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { + uniqueLocalAddress: UniqueAddress, + system: ActorSystem, + compression: OutboundCompressions, + outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], + bufferPool: EnvelopeBufferPool) + extends GraphStage[FlowShape[OutboundEnvelope, EnvelopeBuffer]] { - val in: Inlet[Send] = Inlet("Artery.Encoder.in") + val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in") val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out") - val shape: FlowShape[Send, EnvelopeBuffer] = FlowShape(in, out) + val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { @@ -44,11 +44,14 @@ private[remote] class Encoder( override protected def logSource = classOf[Encoder] override def onPush(): Unit = { - val send = grab(in) + val outboundEnvelope = grab(in) val envelope = bufferPool.acquire() // internally compression is applied by the builder: - headerBuilder setRecipientActorRef send.recipient + outboundEnvelope.recipient match { + case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r + case OptionVal.None ⇒ headerBuilder.setNoRecipient() + } try { // avoiding currentTransportInformation.withValue due to thunk allocation @@ -56,12 +59,12 @@ private[remote] class Encoder( try { Serialization.currentTransportInformation.value = serializationInfo - send.senderOption match { + outboundEnvelope.sender match { case OptionVal.None ⇒ headerBuilder.setNoSender() case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s } - MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) + MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope) } finally Serialization.currentTransportInformation.value = oldValue envelope.byteBuffer.flip() @@ -70,18 +73,24 @@ private[remote] class Encoder( } catch { case NonFatal(e) ⇒ bufferPool.release(envelope) - send.message match { + outboundEnvelope.message match { case _: SystemMessageEnvelope ⇒ - log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName) + log.error(e, "Failed to serialize system message [{}].", outboundEnvelope.message.getClass.getName) throw e case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒ - val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${send.recipient}: max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${send.message.getClass.getName}].") - log.error(reason, "Failed to serialize oversized message [{}].", send.message.getClass.getName) + val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${outboundEnvelope.recipient}: " + + s"max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${outboundEnvelope.message.getClass.getName}].") + log.error(reason, "Failed to serialize oversized message [{}].", outboundEnvelope.message.getClass.getName) pull(in) case _ ⇒ - log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName) + log.error(e, "Failed to serialize message [{}].", outboundEnvelope.message.getClass.getName) pull(in) } + } finally { + outboundEnvelope match { + case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r) + case _ ⇒ + } } } @@ -111,7 +120,7 @@ private[remote] class Decoder( resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know bufferPool: EnvelopeBufferPool, - inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -168,8 +177,7 @@ private[remote] class Decoder( val deserializedMessage = MessageSerializer.deserializeForArtery( system, originUid, serialization, headerBuilder, envelope) - val decoded = inEnvelopePool.acquire() - decoded.asInstanceOf[ReusableInboundEnvelope].init( + val decoded = inEnvelopePool.acquire().init( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index f22a7a5d3c..ceac57e668 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -7,7 +7,6 @@ import java.util.ArrayDeque import scala.concurrent.Future import scala.concurrent.Promise import akka.Done -import akka.remote.EndpointManager.Send import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -160,12 +159,13 @@ private[akka] object OutboundControlJunction { /** * INTERNAL API */ -private[akka] class OutboundControlJunction(outboundContext: OutboundContext) - extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundControlJunction.OutboundControlIngress] { +private[akka] class OutboundControlJunction( + outboundContext: OutboundContext, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) + extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], OutboundControlJunction.OutboundControlIngress] { import OutboundControlJunction._ - val in: Inlet[Send] = Inlet("OutboundControlJunction.in") - val out: Outlet[Send] = Outlet("OutboundControlJunction.out") - override val shape: FlowShape[Send, Send] = FlowShape(in, out) + val in: Inlet[OutboundEnvelope] = Inlet("OutboundControlJunction.in") + val out: Outlet[OutboundEnvelope] = Outlet("OutboundControlJunction.out") + override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way @@ -174,7 +174,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) private val maxControlMessageBufferSize: Int = 1024 // FIXME config - private val buffer = new ArrayDeque[Send] + private val buffer = new ArrayDeque[OutboundEnvelope] override def preStart(): Unit = { initCallback(sendControlMessageCallback.invoke) @@ -207,8 +207,9 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) } } - private def wrap(message: ControlMessage): Send = - Send(message, OptionVal.None, outboundContext.dummyRecipient, None) + private def wrap(message: ControlMessage): OutboundEnvelope = + outboundEnvelopePool.acquire().init( + recipient = OptionVal.None, message = message, sender = OptionVal.None) setHandlers(in, out, this) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 8a66226d6f..a7c8420377 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -7,7 +7,6 @@ import akka.actor.ActorSystem import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress import akka.stream.Attributes import akka.stream.FlowShape @@ -50,21 +49,23 @@ private[akka] object OutboundHandshake { * INTERNAL API */ private[akka] class OutboundHandshake( - system: ActorSystem, - outboundContext: OutboundContext, timeout: FiniteDuration, - retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration) - extends GraphStage[FlowShape[Send, Send]] { + system: ActorSystem, + outboundContext: OutboundContext, + outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], + timeout: FiniteDuration, + retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration) + extends GraphStage[FlowShape[OutboundEnvelope, OutboundEnvelope]] { - val in: Inlet[Send] = Inlet("OutboundHandshake.in") - val out: Outlet[Send] = Outlet("OutboundHandshake.out") - override val shape: FlowShape[Send, Send] = FlowShape(in, out) + val in: Inlet[OutboundEnvelope] = Inlet("OutboundHandshake.in") + val out: Outlet[OutboundEnvelope] = Outlet("OutboundHandshake.out") + override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { import OutboundHandshake._ private var handshakeState: HandshakeState = Start - private var pendingMessage: Send = null + private var pendingMessage: OutboundEnvelope = null private var injectHandshakeTickScheduled = false // InHandler @@ -127,7 +128,9 @@ private[akka] class OutboundHandshake( private def pushHandshakeReq(): Unit = { injectHandshakeTickScheduled = true scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) - push(out, Send(HandshakeReq(outboundContext.localAddress), OptionVal.None, outboundContext.dummyRecipient, None)) + val env: OutboundEnvelope = outboundEnvelopePool.acquire().init( + recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress), sender = OptionVal.None) + push(out, env) } private def handshakeCompleted(): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala new file mode 100644 index 0000000000..2c20cb287d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.InternalActorRef +import akka.util.OptionVal +import akka.actor.Address +import akka.actor.ActorRef + +/** + * INTERNAL API + */ +private[akka] object InboundEnvelope { + def apply( + recipient: OptionVal[InternalActorRef], + recipientAddress: Address, + message: AnyRef, + sender: OptionVal[ActorRef], + originUid: Long, + association: OptionVal[OutboundContext]): InboundEnvelope = { + val env = new ReusableInboundEnvelope + env.init(recipient, recipientAddress, message, sender, originUid, association) + } + +} + +/** + * INTERNAL API + */ +private[akka] trait InboundEnvelope { + def recipient: OptionVal[InternalActorRef] + def recipientAddress: Address + def message: AnyRef + def sender: OptionVal[ActorRef] + def originUid: Long + def association: OptionVal[OutboundContext] + + def withMessage(message: AnyRef): InboundEnvelope + + def withRecipient(ref: InternalActorRef): InboundEnvelope +} + +/** + * INTERNAL API + */ +private[akka] object ReusableInboundEnvelope { + def createObjectPool(capacity: Int) = new ObjectPool[ReusableInboundEnvelope]( + capacity, + create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear()) +} + +/** + * INTERNAL API + */ +private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { + private var _recipient: OptionVal[InternalActorRef] = OptionVal.None + private var _recipientAddress: Address = null + private var _message: AnyRef = null + private var _sender: OptionVal[ActorRef] = OptionVal.None + private var _originUid: Long = 0L + private var _association: OptionVal[OutboundContext] = OptionVal.None + + override def recipient: OptionVal[InternalActorRef] = _recipient + override def recipientAddress: Address = _recipientAddress + override def message: AnyRef = _message + override def sender: OptionVal[ActorRef] = _sender + override def originUid: Long = _originUid + override def association: OptionVal[OutboundContext] = _association + + override def withMessage(message: AnyRef): InboundEnvelope = { + _message = message + this + } + + def withRecipient(ref: InternalActorRef): InboundEnvelope = { + _recipient = OptionVal(ref) + this + } + + def clear(): Unit = { + _recipient = OptionVal.None + _recipientAddress = null + _message = null + _sender = OptionVal.None + _originUid = 0L + _association = OptionVal.None + } + + def init( + recipient: OptionVal[InternalActorRef], + recipientAddress: Address, + message: AnyRef, + sender: OptionVal[ActorRef], + originUid: Long, + association: OptionVal[OutboundContext]): InboundEnvelope = { + _recipient = recipient + _recipientAddress = recipientAddress + _message = message + _sender = sender + _originUid = originUid + _association = association + this + } + + override def toString: String = + s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)" +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala b/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala index 0020d52481..90f6f74d24 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ObjectPool.scala @@ -17,7 +17,7 @@ private[remote] class ObjectPool[A <: AnyRef](capacity: Int, create: () ⇒ A, c else obj } - def release(obj: A) = { + def release(obj: A): Boolean = { clear(obj) (!pool.offer(obj)) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala new file mode 100644 index 0000000000..6a1e9ba84c --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.InternalActorRef +import akka.util.OptionVal +import akka.actor.Address +import akka.actor.ActorRef +import akka.remote.RemoteActorRef + +/** + * INTERNAL API + */ +private[akka] object OutboundEnvelope { + def apply( + recipient: OptionVal[RemoteActorRef], + message: AnyRef, + sender: OptionVal[ActorRef]): OutboundEnvelope = { + val env = new ReusableOutboundEnvelope + env.init(recipient, message, sender) + } + +} + +/** + * INTERNAL API + */ +private[akka] trait OutboundEnvelope { + def recipient: OptionVal[RemoteActorRef] + def message: AnyRef + def sender: OptionVal[ActorRef] + + def withMessage(message: AnyRef): OutboundEnvelope + + def copy(): OutboundEnvelope +} + +/** + * INTERNAL API + */ +private[akka] object ReusableOutboundEnvelope { + def createObjectPool(capacity: Int) = new ObjectPool[ReusableOutboundEnvelope]( + capacity, + create = () ⇒ new ReusableOutboundEnvelope, clear = outEnvelope ⇒ outEnvelope.clear()) +} + +/** + * INTERNAL API + */ +private[akka] final class ReusableOutboundEnvelope extends OutboundEnvelope { + private var _recipient: OptionVal[RemoteActorRef] = OptionVal.None + private var _message: AnyRef = null + private var _sender: OptionVal[ActorRef] = OptionVal.None + + override def recipient: OptionVal[RemoteActorRef] = _recipient + override def message: AnyRef = _message + override def sender: OptionVal[ActorRef] = _sender + + override def withMessage(message: AnyRef): OutboundEnvelope = { + _message = message + this + } + + def copy(): OutboundEnvelope = + (new ReusableOutboundEnvelope).init(_recipient, _message, _sender) + + def clear(): Unit = { + _recipient = OptionVal.None + _message = null + _sender = OptionVal.None + } + + def init( + recipient: OptionVal[RemoteActorRef], + message: AnyRef, + sender: OptionVal[ActorRef]): OutboundEnvelope = { + _recipient = recipient + _message = message + _sender = sender + this + } + + override def toString: String = + s"OutboundEnvelope($recipient, $message, $sender)" +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 320c00603b..a78dc3d18b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -10,7 +10,6 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import akka.Done -import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes @@ -48,21 +47,21 @@ private[akka] class SystemMessageDelivery( deadLetters: ActorRef, resendInterval: FiniteDuration, maxBufferSize: Int) - extends GraphStage[FlowShape[Send, Send]] { + extends GraphStage[FlowShape[OutboundEnvelope, OutboundEnvelope]] { import SystemMessageDelivery._ - val in: Inlet[Send] = Inlet("SystemMessageDelivery.in") - val out: Outlet[Send] = Outlet("SystemMessageDelivery.out") - override val shape: FlowShape[Send, Send] = FlowShape(in, out) + val in: Inlet[OutboundEnvelope] = Inlet("SystemMessageDelivery.in") + val out: Outlet[OutboundEnvelope] = Outlet("SystemMessageDelivery.out") + override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { private var replyObserverAttached = false private var seqNo = 0L // sequence number for the first message will be 1 - private val unacknowledged = new ArrayDeque[Send] - private var resending = new ArrayDeque[Send] + private val unacknowledged = new ArrayDeque[OutboundEnvelope] + private var resending = new ArrayDeque[OutboundEnvelope] private var resendingFromSeqNo = -1L private var stopping = false @@ -156,43 +155,49 @@ private[akka] class SystemMessageDelivery( private def tryResend(): Unit = { if (isAvailable(out) && !resending.isEmpty) - push(out, resending.poll()) + pushCopy(resending.poll()) + } + + // important to not send the buffered instance, since it's mutable + private def pushCopy(outboundEnvelope: OutboundEnvelope): Unit = { + push(out, outboundEnvelope.copy()) } // InHandler override def onPush(): Unit = { - grab(in) match { - case s @ Send(_: HandshakeReq, _, _, _) ⇒ + val outboundEnvelope = grab(in) + outboundEnvelope.message match { + case _: HandshakeReq ⇒ // pass on HandshakeReq if (isAvailable(out)) - push(out, s) - case s @ Send(ClearSystemMessageDelivery, _, _, _) ⇒ + pushCopy(outboundEnvelope) + case ClearSystemMessageDelivery ⇒ clear() pull(in) - case s @ Send(msg: ControlMessage, _, _, _) ⇒ + case _: ControlMessage ⇒ // e.g. ActorSystemTerminating, no need for acked delivery if (resending.isEmpty && isAvailable(out)) - push(out, s) + pushCopy(outboundEnvelope) else { - resending.offer(s) + resending.offer(outboundEnvelope) tryResend() } - case s @ Send(msg: AnyRef, _, _, _) ⇒ + case msg ⇒ if (unacknowledged.size < maxBufferSize) { seqNo += 1 - val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) - unacknowledged.offer(sendMsg) + val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress)) + unacknowledged.offer(sendEnvelope) scheduleOnce(ResendTick, resendInterval) if (resending.isEmpty && isAvailable(out)) - push(out, sendMsg) + pushCopy(sendEnvelope) else { - resending.offer(sendMsg) + resending.offer(sendEnvelope) tryResend() } } else { // buffer overflow outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]") - deadLetters ! s + deadLetters ! outboundEnvelope pull(in) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index e9fba1ffdd..b354f32bda 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -9,7 +9,6 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import akka.Done import akka.actor.Address -import akka.remote.EndpointManager.Send import akka.remote.transport.ThrottlerTransportAdapter.Blackhole import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle @@ -59,10 +58,10 @@ private[remote] final case class TestManagementMessage(command: Any, done: Promi * INTERNAL API */ private[remote] class OutboundTestStage(outboundContext: OutboundContext) - extends GraphStageWithMaterializedValue[FlowShape[Send, Send], TestManagementApi] { - val in: Inlet[Send] = Inlet("OutboundTestStage.in") - val out: Outlet[Send] = Outlet("OutboundTestStage.out") - override val shape: FlowShape[Send, Send] = FlowShape(in, out) + extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], TestManagementApi] { + val in: Inlet[OutboundEnvelope] = Inlet("OutboundTestStage.in") + val out: Outlet[OutboundEnvelope] = Outlet("OutboundTestStage.out") + override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val stoppedPromise = Promise[Done]() diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 5464d71f72..75b7ab0f08 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -27,8 +27,7 @@ private[remote] final class InboundActorRefCompression( settings: CompressionSettings, originUid: Long, inboundContext: InboundContext, - heavyHitters: TopHeavyHitters[ActorRef] -) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, _.path.toSerializationFormat) { + heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, _.path.toSerializationFormat) { preAllocate(system.deadLetters) @@ -55,8 +54,7 @@ final class InboundManifestCompression( settings: CompressionSettings, originUid: Long, inboundContext: InboundContext, - heavyHitters: TopHeavyHitters[String] -) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, ConstantFun.scalaIdentityFunction) { + heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, ConstantFun.scalaIdentityFunction) { scheduleNextTableAdvertisement() override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval @@ -144,7 +142,7 @@ private[remote] abstract class InboundCompression[T >: Null]( } else { val count = cms.addAndEstimateCount(key, n) - // TODO optimise order of these, what is more expensive? + // TODO optimise order of these, what is more expensive? // TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. val wasHeavyHitter = addAndCheckIfheavyHitterDetected(value, count) if (wasHeavyHitter) @@ -160,7 +158,7 @@ private[remote] abstract class InboundCompression[T >: Null]( key match { case null ⇒ true case "" ⇒ true // empty class manifest for example - case _ ⇒ key.endsWith("/system/dummy") || key.endsWith("/") // TODO dummy likely shouldn't exist? can we remove it? + case _ ⇒ key.endsWith("/") } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala index c6249a383e..e99219bad7 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala @@ -370,7 +370,7 @@ class FlightRecorderSpec extends AkkaSpec { } - def withFlightRecorder(body: (FlightRecorder, FlightRecorderReader, FileChannel) ⇒ Unit): Unit = { + private def withFlightRecorder(body: (FlightRecorder, FlightRecorderReader, FileChannel) ⇒ Unit): Unit = { val file = File.createTempFile("artery", ".afr") file.deleteOnExit() diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala index 7b79fdaced..c4b5569a0e 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala @@ -33,17 +33,18 @@ class OutboundControlJunctionSpec extends AkkaSpec with ImplicitSender { val addressA = UniqueAddress(Address("artery", "sysA", "hostA", 1001), 1) val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2) + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) + "Control messages" must { "be injected via side channel" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) - val destination = null.asInstanceOf[RemoteActorRef] // not used val ((upstream, controlIngress), downstream) = TestSource.probe[String] - .map(msg ⇒ Send(msg, OptionVal.None, destination, None)) - .viaMat(new OutboundControlJunction(outboundContext))(Keep.both) - .map { case Send(msg, _, _, _) ⇒ msg } + .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.None)) + .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.both) + .map(env ⇒ env.message) .toMat(TestSink.probe[Any])(Keep.both) .run() diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 3dd7227cb6..6066d042fc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -31,16 +31,17 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val addressA = UniqueAddress(Address("artery", "sysA", "hostA", 1001), 1) val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2) + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) + private def setupStream( outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds, retryInterval: FiniteDuration = 10.seconds, injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { - val destination = null.asInstanceOf[RemoteActorRef] // not used TestSource.probe[String] - .map(msg ⇒ Send(msg, OptionVal.None, destination, None)) - .via(new OutboundHandshake(system, outboundContext, timeout, retryInterval, injectHandshakeInterval)) - .map { case Send(msg, _, _, _) ⇒ msg } + .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.None)) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, timeout, retryInterval, injectHandshakeInterval)) + .map(env ⇒ env.message) .toMat(TestSink.probe[Any])(Keep.both) .run() } diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 2ebedb00c3..624acf337a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -16,7 +16,6 @@ import akka.actor.InternalActorRef import akka.actor.PoisonPill import akka.actor.RootActorPath import akka.remote.{ AddressUidExtension, RARP, RemoteActorRef, UniqueAddress } -import akka.remote.EndpointManager.Send import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -62,41 +61,46 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi val matSettings = ActorMaterializerSettings(system).withFuzzing(true) implicit val mat = ActorMaterializer(matSettings)(system) + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) + override def afterTermination(): Unit = shutdown(systemB) - private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = { - val remoteRef = null.asInstanceOf[RemoteActorRef] // not used + private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = { val deadLetters = TestProbe().ref Source(1 to sendCount) - .map(n ⇒ Send("msg-" + n, OptionVal.None, remoteRef, None)) + .map(n ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, "msg-" + n, OptionVal.None)) .via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000)) } - private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = { + private def inbound(inboundContext: InboundContext): Flow[OutboundEnvelope, InboundEnvelope, NotUsed] = { val recipient = OptionVal.None // not used - Flow[Send] - .map { - case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ + Flow[OutboundEnvelope] + .map(outboundEnvelope ⇒ outboundEnvelope.message match { + case sysEnv: SystemMessageEnvelope ⇒ InboundEnvelope(recipient, addressB.address, sysEnv, OptionVal.None, addressA.uid, inboundContext.association(addressA.uid)) - } + }) .async .via(new SystemMessageAcker(inboundContext)) } - private def drop(dropSeqNumbers: Vector[Long]): Flow[Send, Send, NotUsed] = { - Flow[Send] + private def drop(dropSeqNumbers: Vector[Long]): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = { + Flow[OutboundEnvelope] .statefulMapConcat(() ⇒ { var dropping = dropSeqNumbers { - case s @ Send(SystemMessageEnvelope(_, seqNo, _), _, _, _) ⇒ - val i = dropping.indexOf(seqNo) - if (i >= 0) { - dropping = dropping.updated(i, -1L) - Nil - } else - List(s) + outboundEnvelope ⇒ + outboundEnvelope.message match { + case SystemMessageEnvelope(_, seqNo, _) ⇒ + val i = dropping.indexOf(seqNo) + if (i >= 0) { + dropping = dropping.updated(i, -1L) + Nil + } else + List(outboundEnvelope) + case _ ⇒ Nil + } } }) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 1d378a6ea9..41c12280f8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -89,9 +89,6 @@ private[akka] class TestOutboundContext( OptionVal.None)) } - // FIXME we should be able to Send without a recipient ActorRef - override def dummyRecipient: RemoteActorRef = null - } private[akka] class TestControlMessageSubject extends ControlMessageSubject {