diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 914fc409b2..46b195cd4e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -89,18 +89,20 @@ private[akka] object RemoteActorRefProvider { import EndpointManager.Send override def !(message: Any)(implicit sender: ActorRef): Unit = message match { - case Send(m, senderOption, _, seqOpt) ⇒ + case Send(m, senderOption, recipient, seqOpt) ⇒ // else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved // the dead letter status - if (seqOpt.isEmpty) super.!(m)(senderOption.orNull) + if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient)) case DeadLetter(Send(m, senderOption, recipient, seqOpt), _, _) ⇒ // else ignore: it is a reliably delivered message that might be retried later, and it has not yet deserved // the dead letter status - if (seqOpt.isEmpty) super.!(m)(senderOption.orNull) + if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient)) case env: OutboundEnvelope ⇒ - super.!(env.message)(env.sender.orNull) + super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + env.recipient.getOrElse(_provider.deadLetters))) case DeadLetter(env: OutboundEnvelope, _, _) ⇒ - super.!(env.message)(env.sender.orNull) + super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + env.recipient.getOrElse(_provider.deadLetters))) case _ ⇒ super.!(message)(sender) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 552ce77b65..8094e22c02 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -43,6 +43,7 @@ import akka.util.OptionVal import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.remote.artery.compress.CompressionProtocol._ import akka.stream.scaladsl.MergeHub +import akka.actor.DeadLetter /** * INTERNAL API @@ -144,6 +145,8 @@ private[remote] class Association( _testStages.asScala.toList } + private def deadletters = transport.system.deadLetters + def outboundControlIngress: OutboundControlIngress = { if (_outboundControlIngress ne null) _outboundControlIngress @@ -231,6 +234,14 @@ private[remote] class Association( // volatile read to see latest queue array val unused = queuesVisibility + def dropped(qSize: Int, env: OutboundEnvelope): Unit = { + log.debug( + "Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]", + message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize) + // FIXME AFR + deadletters ! env + } + // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { @@ -239,7 +250,7 @@ private[remote] class Association( val outboundEnvelope = createOutboundEnvelope() if (!controlQueue.offer(createOutboundEnvelope())) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") - transport.system.deadLetters ! outboundEnvelope + dropped(controlQueueSize, outboundEnvelope) } case _: DaemonMsgCreate ⇒ // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because @@ -248,21 +259,24 @@ private[remote] class Association( // destination) before the first ordinary message arrives. val outboundEnvelope1 = createOutboundEnvelope() if (!controlQueue.offer(outboundEnvelope1)) - transport.system.deadLetters ! outboundEnvelope1 + dropped(controlQueueSize, outboundEnvelope1) (0 until outboundLanes).foreach { i ⇒ val outboundEnvelope2 = createOutboundEnvelope() if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2)) - transport.system.deadLetters ! outboundEnvelope2 + dropped(queueSize, outboundEnvelope2) } case _ ⇒ val outboundEnvelope = createOutboundEnvelope() val queue = selectQueue(recipient) val offerOk = queue.offer(outboundEnvelope) if (!offerOk) - transport.system.deadLetters ! outboundEnvelope + dropped(queueSize, outboundEnvelope) + } } else if (log.isDebugEnabled) - log.debug("Dropping message to quarantined system {}", remoteAddress) + log.debug( + "Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]", + message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress) } private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = {