send dropped system messages to deadLetters

* publish remote lifecycle event for quarantined
This commit is contained in:
Patrik Nordwall 2016-06-08 12:40:40 +02:00
parent 7a1a316e8a
commit 7ce6dffabf
6 changed files with 28 additions and 16 deletions

View file

@ -497,12 +497,14 @@ private[akka] class RemoteActorRef private[akka] (
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
override private[akka] def isTerminated: Boolean = false
private def handleException: Catcher[Unit] = {
private def handleException(message: Any, sender: ActorRef): Catcher[Unit] = {
case e: InterruptedException
remote.system.eventStream.publish(Error(e, path.toString, getClass, "interrupted during message send"))
remote.system.deadLetters.tell(message, sender)
Thread.currentThread.interrupt()
case NonFatal(e)
remote.system.eventStream.publish(Error(e, path.toString, getClass, "swallowing exception during message send"))
remote.system.deadLetters.tell(message, sender)
}
/**
@ -529,11 +531,11 @@ private[akka] class RemoteActorRef private[akka] (
provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
case _ remote.send(message, OptionVal.None, this)
}
} catch handleException
} catch handleException(message, Actor.noSender)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
if (message == null) throw new InvalidMessageException("Message is null")
try remote.send(message, OptionVal(sender), this) catch handleException
try remote.send(message, OptionVal(sender), this) catch handleException(message, sender)
}
override def provider: RemoteActorRefProvider = remote.provider

View file

@ -9,16 +9,13 @@ import java.nio.channels.DatagramChannel
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
@ -68,13 +65,10 @@ import org.agrona.IoUtil
import java.io.File
import java.net.InetSocketAddress
import java.nio.channels.{ DatagramChannel, FileChannel }
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import io.aeron.CncFileDescriptor
import java.util.concurrent.atomic.AtomicLong
import akka.actor.Cancellable
import scala.collection.JavaConverters._
import akka.stream.ActorMaterializerSettings
import scala.annotation.tailrec
@ -331,7 +325,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def addresses: Set[Address] = _addresses
override def localAddressForRemote(remote: Address): Address = defaultAddress
override val log: LoggingAdapter = Logging(system, getClass.getName)
private val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel)
val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel)
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
@ -705,7 +699,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize))
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval,
remoteSettings.SysMsgBufferSize))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
.via(encoder)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,

View file

@ -36,6 +36,7 @@ import akka.stream.scaladsl.Source
import akka.util.{ Unsafe, WildcardTree }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.util.OptionVal
import akka.remote.QuarantinedEvent
/**
* INTERNAL API
@ -223,6 +224,8 @@ private[remote] class Association(
log.warning(
"Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
remoteAddress, u, reason)
// FIXME when we complete the switch to Long UID we must use Long here also, issue #20644
transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt))
// end delivery of system messages to that incarnation after this point
send(ClearSystemMessageDelivery, OptionVal.None, dummyRecipient)
// try to tell the other system that we have quarantined it

View file

@ -23,6 +23,7 @@ import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.actor.ActorRef
/**
* INTERNAL API
@ -44,6 +45,7 @@ private[akka] object SystemMessageDelivery {
*/
private[akka] class SystemMessageDelivery(
outboundContext: OutboundContext,
deadLetters: ActorRef,
resendInterval: FiniteDuration,
maxBufferSize: Int)
extends GraphStage[FlowShape[Send, Send]] {
@ -87,6 +89,8 @@ private[akka] class SystemMessageDelivery(
}
override def postStop(): Unit = {
sendUnacknowledgedToDeadLetters()
unacknowledged.clear()
outboundContext.controlSubject.detach(this)
}
@ -180,12 +184,14 @@ private[akka] class SystemMessageDelivery(
} else {
// buffer overflow
outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]")
deadLetters ! s
pull(in)
}
}
}
private def clear(): Unit = {
sendUnacknowledgedToDeadLetters()
seqNo = 0L // sequence number for the first message will be 1
unacknowledged.clear()
resending.clear()
@ -193,6 +199,13 @@ private[akka] class SystemMessageDelivery(
cancelTimer(resendInterval)
}
private def sendUnacknowledgedToDeadLetters(): Unit = {
val iter = unacknowledged.iterator
while (iter.hasNext()) {
deadLetters ! iter.next()
}
}
// OutHandler
override def onPull(): Unit = {
if (replyObserverAttached) { // otherwise it will be pulled after attached

View file

@ -72,8 +72,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im
}
}
// FIXME this is failing with Artery
"receive Terminated when watched node is unknown host" ignore {
"receive Terminated when watched node is unknown host" in {
val path = RootActorPath(Address("artery", system.name, "unknownhost", 2552)) / "user" / "subject"
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path))
@ -85,8 +84,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im
expectMsg(60.seconds, path)
}
// FIXME this is failing with Artery
"receive ActorIdentity(None) when identified node is unknown host" ignore {
"receive ActorIdentity(None) when identified node is unknown host" in {
val path = RootActorPath(Address("artery", system.name, "unknownhost2", 2552)) / "user" / "subject"
system.actorSelection(path) ! Identify(path)
expectMsg(60.seconds, ActorIdentity(path, None))

View file

@ -66,9 +66,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = {
val remoteRef = null.asInstanceOf[RemoteActorRef] // not used
val deadLetters = TestProbe().ref
Source(1 to sendCount)
.map(n Send("msg-" + n, OptionVal.None, remoteRef, None))
.via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000))
.via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000))
}
private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = {