diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 119b406d8f..70bf13e572 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -497,12 +497,14 @@ private[akka] class RemoteActorRef private[akka] ( @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override private[akka] def isTerminated: Boolean = false - private def handleException: Catcher[Unit] = { + private def handleException(message: Any, sender: ActorRef): Catcher[Unit] = { case e: InterruptedException ⇒ remote.system.eventStream.publish(Error(e, path.toString, getClass, "interrupted during message send")) + remote.system.deadLetters.tell(message, sender) Thread.currentThread.interrupt() case NonFatal(e) ⇒ remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send")) + remote.system.deadLetters.tell(message, sender) } /** @@ -529,11 +531,11 @@ private[akka] class RemoteActorRef private[akka] ( provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) case _ ⇒ remote.send(message, OptionVal.None, this) } - } catch handleException + } catch handleException(message, Actor.noSender) override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { if (message == null) throw new InvalidMessageException("Message is null") - try remote.send(message, OptionVal(sender), this) catch handleException + try remote.send(message, OptionVal(sender), this) catch handleException(message, sender) } override def provider: RemoteActorRefProvider = remote.provider diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 5acdf104be..9edaa01bf2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -9,16 +9,13 @@ import java.nio.channels.DatagramChannel import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong - import scala.collection.JavaConverters._ - import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.Done import akka.NotUsed import akka.actor.ActorRef @@ -68,13 +65,10 @@ import org.agrona.IoUtil import java.io.File import java.net.InetSocketAddress import java.nio.channels.{ DatagramChannel, FileChannel } - import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import io.aeron.CncFileDescriptor import java.util.concurrent.atomic.AtomicLong - import akka.actor.Cancellable - import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings import scala.annotation.tailrec @@ -331,7 +325,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress override val log: LoggingAdapter = Logging(system, getClass.getName) - private val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel) + val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel) private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") @@ -705,7 +699,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) - .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) + .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, + remoteSettings.SysMsgBufferSize)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 8a4e85c5ee..ca90c9a366 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -36,6 +36,7 @@ import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardTree } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal +import akka.remote.QuarantinedEvent /** * INTERNAL API @@ -223,6 +224,8 @@ private[remote] class Association( log.warning( "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", remoteAddress, u, reason) + // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 + transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt)) // end delivery of system messages to that incarnation after this point send(ClearSystemMessageDelivery, OptionVal.None, dummyRecipient) // try to tell the other system that we have quarantined it diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index a6ba8b25fe..de2ee9da60 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -23,6 +23,7 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.remote.artery.OutboundHandshake.HandshakeReq +import akka.actor.ActorRef /** * INTERNAL API @@ -44,6 +45,7 @@ private[akka] object SystemMessageDelivery { */ private[akka] class SystemMessageDelivery( outboundContext: OutboundContext, + deadLetters: ActorRef, resendInterval: FiniteDuration, maxBufferSize: Int) extends GraphStage[FlowShape[Send, Send]] { @@ -87,6 +89,8 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { + sendUnacknowledgedToDeadLetters() + unacknowledged.clear() outboundContext.controlSubject.detach(this) } @@ -180,12 +184,14 @@ private[akka] class SystemMessageDelivery( } else { // buffer overflow outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]") + deadLetters ! s pull(in) } } } private def clear(): Unit = { + sendUnacknowledgedToDeadLetters() seqNo = 0L // sequence number for the first message will be 1 unacknowledged.clear() resending.clear() @@ -193,6 +199,13 @@ private[akka] class SystemMessageDelivery( cancelTimer(resendInterval) } + private def sendUnacknowledgedToDeadLetters(): Unit = { + val iter = unacknowledged.iterator + while (iter.hasNext()) { + deadLetters ! iter.next() + } + } + // OutHandler override def onPull(): Unit = { if (replyObserverAttached) { // otherwise it will be pulled after attached diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 7313806127..297cb9fcc0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -72,8 +72,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im } } - // FIXME this is failing with Artery - "receive Terminated when watched node is unknown host" ignore { + "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address("artery", system.name, "unknownhost", 2552)) / "user" / "subject" system.actorOf(Props(new Actor { context.watch(context.actorFor(path)) @@ -85,8 +84,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im expectMsg(60.seconds, path) } - // FIXME this is failing with Artery - "receive ActorIdentity(None) when identified node is unknown host" ignore { + "receive ActorIdentity(None) when identified node is unknown host" in { val path = RootActorPath(Address("artery", system.name, "unknownhost2", 2552)) / "user" / "subject" system.actorSelection(path) ! Identify(path) expectMsg(60.seconds, ActorIdentity(path, None)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index a84c964338..5f9a3e0b50 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -66,9 +66,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = { val remoteRef = null.asInstanceOf[RemoteActorRef] // not used + val deadLetters = TestProbe().ref Source(1 to sendCount) .map(n ⇒ Send("msg-" + n, OptionVal.None, remoteRef, None)) - .via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000)) + .via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000)) } private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = {