Merge pull request #21398 from akka/wip-fixme-patriknw

fix a bunch of FIXMEs in Artery
This commit is contained in:
Patrik Nordwall 2016-09-08 19:15:43 +02:00 committed by GitHub
commit 02de58392a
30 changed files with 182 additions and 112 deletions

View file

@ -75,6 +75,8 @@ class CodecBenchmark {
override def sendControl(to: Address, message: ControlMessage): Unit = ???
override def association(remoteAddress: Address): OutboundContext = ???
override def completeHandshake(peer: UniqueAddress): Future[Done] = ???
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private var materializer: ActorMaterializer = _

View file

@ -90,7 +90,7 @@ private[cluster] class ClusterRemoteWatcher(
if (m.address != selfAddress) {
clusterNodes -= m.address
if (previousStatus == MemberStatus.Down) {
quarantine(m.address, Some(m.uniqueAddress.uid))
quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed")
}
publishAddressTerminated(m.address)
}

View file

@ -58,7 +58,7 @@ abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldK
enterBarrier("actor-identified")
// Manually Quarantine the other system
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
// Quarantining is not immediate
Thread.sleep(1000)

View file

@ -78,7 +78,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie
enterBarrier("actor-identified")
// Manually Quarantine the other system
RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst))
RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst), "test")
// Quarantine is up -- Cannot communicate with remote system any more
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"

View file

@ -80,7 +80,7 @@ abstract class RemoteRestartedQuarantinedSpec
val (uid, ref) = identifyWithUid(second, "subject")
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
enterBarrier("quarantined")
enterBarrier("still-quarantined")

View file

@ -73,7 +73,7 @@ abstract class RemoteRestartedQuarantinedSpec
val (uid, ref) = identifyWithUid(second, "subject", 5.seconds)
enterBarrier("before-quarantined")
RARP(system).provider.transport.quarantine(node(second).address, Some(uid))
RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test")
enterBarrier("quarantined")
enterBarrier("still-quarantined")

View file

@ -85,7 +85,6 @@ akka {
remote {
### FIXME: Temporary switch for the PoC
artery {
enabled = off
port = 20200
@ -166,10 +165,33 @@ akka {
# different destination actors. The selection of lane is based on consistent
# hashing of the recipient ActorRef to preserve message ordering per receiver.
inbound-lanes = 1
# Size of the send queue for outgoing messages. Messages will be dropped if
# the queue becomes full. This may happen if you send a burst of many messages
# without end-to-end flow control. Note that there is one such queue per
# outbound association. The trade-off of using a larger queue size is that
# it consumes more memory, since the queue is based on preallocated array with
# fixed size.
outbound-message-queue-size = 3072
# Size of the send queue for outgoing control messages, such as system messages.
# If this limit is reached the remote system is declared to be dead and its UID
# marked as quarantined.
# The trade-off of using a larger queue size is that it consumes more memory,
# since the queue is based on preallocated array with fixed size.
outbound-control-queue-size = 3072
# Size of the send queue for outgoing large messages. Messages will be dropped if
# the queue becomes full. This may happen if you send a burst of many messages
# without end-to-end flow control. Note that there is one such queue per
# outbound association. The trade-off of using a larger queue size is that
# it consumes more memory, since the queue is based on preallocated array with
# fixed size.
outbound-large-message-queue-size = 256
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
# declared to be dead and its UID marked as quarantined.
system-message-buffer-size = 20000
# unacknowledged system messages are re-delivered with this interval

View file

@ -449,7 +449,8 @@ private[akka] class RemoteActorRefProvider(
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/
def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid)
def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
transport.quarantine(address, uid, reason)
}

View file

@ -90,6 +90,6 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/
def quarantine(address: Address, uid: Option[Int]): Unit
def quarantine(address: Address, uid: Option[Int], reason: String): Unit
}

View file

@ -158,7 +158,7 @@ private[akka] class RemoteWatcher(
watchingNodes foreach { a
if (!unreachable(a) && !failureDetector.isAvailable(a)) {
log.warning("Detected unreachable: [{}]", a)
quarantine(a, addressUids.get(a))
quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector")
publishAddressTerminated(a)
unreachable += a
}
@ -167,8 +167,8 @@ private[akka] class RemoteWatcher(
def publishAddressTerminated(address: Address): Unit =
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
def quarantine(address: Address, uid: Option[Int]): Unit =
remoteProvider.quarantine(address, uid)
def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
remoteProvider.quarantine(address, uid, reason)
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {
assert(watcher != self)
@ -282,4 +282,4 @@ private[akka] class RemoteWatcher(
log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path)
watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
}
}
}

View file

@ -225,7 +225,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
case None throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
}
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match {
override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = endpointManager match {
case Some(manager) manager ! Quarantine(remoteAddress, uid)
case _ throw new RemoteTransportExceptionNoStackTrace(
s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)

View file

@ -30,11 +30,13 @@ object AeronSink {
final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace
final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace
private val TimerCheckPeriod = 1 << 13 // 8192
private val TimerCheckMask = TimerCheckPeriod - 1
private final class OfferTask(pub: Publication, var buffer: UnsafeBuffer, var msgSize: Int, onOfferSuccess: AsyncCallback[Unit],
giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit])
giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit], onPublicationClosed: AsyncCallback[Unit])
extends (() Boolean) {
val giveUpAfterNanos = giveUpAfter match {
case f: FiniteDuration f.toNanos
@ -54,6 +56,9 @@ object AeronSink {
n = 0L
onOfferSuccess.invoke(())
true
} else if (result == Publication.CLOSED) {
onPublicationClosed.invoke(())
true
} else if (giveUpAfterNanos >= 0 && (n & TimerCheckMask) == 0 && (System.nanoTime() - startTime) > giveUpAfterNanos) {
// the task is invoked by the spinning thread, only check nanoTime each 8192th invocation
n = 0L
@ -99,7 +104,7 @@ class AeronSink(
private var backoffCount = spinning
private var lastMsgSize = 0
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ taskOnOfferSuccess()),
giveUpSendAfter, getAsyncCallback(_ onGiveUp()))
giveUpSendAfter, getAsyncCallback(_ onGiveUp()), getAsyncCallback(_ onPublicationClosed()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
@ -135,21 +140,18 @@ class AeronSink(
@tailrec private def publish(): Unit = {
val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize)
// FIXME handle Publication.CLOSED
if (result < 0) {
backoffCount -= 1
if (backoffCount > 0) {
ThreadHints.onSpinWait()
publish() // recursive
} else {
// delegate backoff to shared TaskRunner
offerTaskInProgress = true
// visibility of these assignments are ensured by adding the task to the command queue
offerTask.buffer = envelopeInFlight.aeronBuffer
offerTask.msgSize = lastMsgSize
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addOfferTask)
flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate)
if (result == Publication.CLOSED)
onPublicationClosed()
else if (result == Publication.NOT_CONNECTED)
delegateBackoff()
else {
backoffCount -= 1
if (backoffCount > 0) {
ThreadHints.onSpinWait()
publish() // recursive
} else
delegateBackoff()
}
} else {
countBeforeDelegate += 1
@ -157,6 +159,17 @@ class AeronSink(
}
}
private def delegateBackoff(): Unit = {
// delegate backoff to shared TaskRunner
offerTaskInProgress = true
// visibility of these assignments are ensured by adding the task to the command queue
offerTask.buffer = envelopeInFlight.aeronBuffer
offerTask.msgSize = lastMsgSize
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addOfferTask)
flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate)
}
private def taskOnOfferSuccess(): Unit = {
countBeforeDelegate = 0
flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime)
@ -184,6 +197,15 @@ class AeronSink(
failStage(cause)
}
private def onPublicationClosed(): Unit = {
offerTaskInProgress = false
val cause = new PublicationClosedException(s"Aeron Publication to [${channel}] was closed.")
// this is not exepected, since we didn't close the publication ourselves
flightRecorder.alert(AeronSink_PublicationClosed, channelMetadata)
completedValue = Failure(cause)
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
// flush outstanding offer before completing stage
if (!offerTaskInProgress)

View file

@ -57,6 +57,12 @@ private[akka] final class ArterySettings private (config: Config) {
n > 0, "inbound-lanes must be greater than zero")
val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
_ > 0, "system-message-buffer-size must be more than zero")
val OutboundMessageQueueSize: Int = getInt("outbound-message-queue-size").requiring(
_ > 0, "outbound-message-queue-size must be more than zero")
val OutboundControlQueueSize: Int = getInt("outbound-control-queue-size").requiring(
_ > 0, "outbound-control-queue-size must be more than zero")
val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring(
_ > 0, "outbound-large-message-queue-size must be more than zero")
val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > Duration.Zero, "system-message-resend-interval must be more than zero")
val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval

View file

@ -104,6 +104,8 @@ private[akka] trait InboundContext {
def completeHandshake(peer: UniqueAddress): Future[Done]
def settings: ArterySettings
}
/**
@ -219,6 +221,8 @@ private[akka] trait OutboundContext {
*/
def controlSubject: ControlMessageSubject
def settings: ArterySettings
}
/**
@ -339,9 +343,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers)
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
// FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity
// times a factor (for reasonable number of outbound streams)
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 3072 * 2)
// The outboundEnvelopePool is shared among all outbound associations
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity =
settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3)
val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() match {
case None (None, None, None)
@ -372,7 +376,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
priorityMessageDestinations,
outboundEnvelopePool))
def settings = provider.remoteSettings.Artery
override def settings = provider.remoteSettings.Artery
override def start(): Unit = {
startMediaDriver()
@ -539,7 +543,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def startAeronErrorLog(): Unit = {
val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE))
val lastTimestamp = new AtomicLong(0L)
import system.dispatcher // FIXME perhaps use another dispatcher for this
import system.dispatcher
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
if (!isShutdown) {
val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get)
@ -708,7 +712,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
implicit val ec = materializer.executionContext
updateStreamCompletion(streamName, streamCompleted.recover { case _ Done })
streamCompleted.onFailure {
case ShutdownSignal // shutdown as expected
case ShutdownSignal // shutdown as expected
case _: AeronTerminated // shutdown already in progress
case cause if isShutdown
// don't restart after shutdown, but log some details so we notice
log.error(cause, s"{} failed after shutdown. {}", streamName, cause.getMessage)
@ -835,9 +840,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit =
eventPublisher.notifyListeners(event)
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
// FIXME change the method signature (old remoting) to include reason and use Long uid?
association(remoteAddress).quarantine(reason = "", uid.map(_.toLong))
override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = {
// FIXME use Long uid
association(remoteAddress).quarantine(reason, uid.map(_.toLong))
}
def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] =

View file

@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.KillSwitches
import scala.util.Failure
import scala.util.Success
import akka.remote.artery.ArteryTransport.AeronTerminated
/**
* INTERNAL API
@ -96,10 +97,15 @@ private[remote] class Association(
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
extends AbstractAssociation with OutboundContext {
import Association._
import FlightRecorderEvents._
private val log = Logging(transport.system, getClass.getName)
private val flightRecorder = transport.createFlightRecorderEventSink(synchr = true)
private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout)
override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced
private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
@ -108,12 +114,10 @@ private[remote] class Association(
def createQueue(capacity: Int): Queue[OutboundEnvelope] =
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
private val outboundLanes = transport.settings.Advanced.OutboundLanes
private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private val outboundLanes = advancedSettings.OutboundLanes
private val controlQueueSize = advancedSettings.OutboundControlQueueSize
private val queueSize = advancedSettings.OutboundMessageQueueSize
private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize
private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes)
queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream
@ -265,29 +269,29 @@ private[remote] class Association(
outboundControlIngress.sendControlMessage(message)
def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {
def createOutboundEnvelope(): OutboundEnvelope =
outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)
// volatile read to see latest queue array
val unused = queuesVisibility
def dropped(qSize: Int, env: OutboundEnvelope): Unit = {
def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
log.debug(
"Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]",
message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize)
// FIXME AFR
flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
deadletters ! env
}
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
// FIXME where is that ActorSelectionMessage check in old remoting?
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
message match {
case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage
val outboundEnvelope = createOutboundEnvelope()
if (!controlQueue.offer(createOutboundEnvelope())) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(controlQueueSize, outboundEnvelope)
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case _: DaemonMsgCreate
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
@ -296,18 +300,19 @@ private[remote] class Association(
// destination) before the first ordinary message arrives.
val outboundEnvelope1 = createOutboundEnvelope()
if (!controlQueue.offer(outboundEnvelope1))
dropped(controlQueueSize, outboundEnvelope1)
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope1)
(0 until outboundLanes).foreach { i
val outboundEnvelope2 = createOutboundEnvelope()
if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2))
dropped(queueSize, outboundEnvelope2)
dropped(OrdinaryQueueIndex + i, queueSize, outboundEnvelope2)
}
case _
val outboundEnvelope = createOutboundEnvelope()
val queue = selectQueue(recipient)
val queueIndex = selectQueue(recipient)
val queue = queues(queueIndex)
val offerOk = queue.offer(outboundEnvelope)
if (!offerOk)
dropped(queueSize, outboundEnvelope)
dropped(queueIndex, queueSize, outboundEnvelope)
}
} else if (log.isDebugEnabled)
@ -316,10 +321,10 @@ private[remote] class Association(
message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress)
}
private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = {
private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = {
recipient match {
case OptionVal.Some(r)
val queueIndex = r.cachedSendQueueIndex match {
r.cachedSendQueueIndex match {
case -1
// only happens when messages are sent to new remote destination
// and is then cached on the RemoteActorRef
@ -341,10 +346,9 @@ private[remote] class Association(
idx
case idx idx
}
queues(queueIndex)
case OptionVal.None
queues(OrdinaryQueueIndex)
OrdinaryQueueIndex
}
}
@ -388,7 +392,6 @@ private[remote] class Association(
remoteAddress, reason)
}
case None
// FIXME should we do something more, old impl used gating?
log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress)
}
@ -430,7 +433,7 @@ private[remote] class Association(
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -466,7 +469,7 @@ private[remote] class Association(
.toMat(transport.outbound(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -510,7 +513,7 @@ private[remote] class Association(
val (queueValues, testMgmtValues) = a.unzip
val (changeCompressionValues, laneCompletedValues) = b.unzip
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
testMgmtValues.foreach(_testStages.add)
import transport.system.dispatcher
@ -546,7 +549,7 @@ private[remote] class Association(
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -573,6 +576,7 @@ private[remote] class Association(
updateStreamCompletion(streamName, streamCompleted.recover { case _ Done })
streamCompleted.onFailure {
case ArteryTransport.ShutdownSignal // shutdown as expected
case _: AeronTerminated // shutdown already in progress
case cause if transport.isShutdown
// don't restart after shutdown, but log some details so we notice
log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
@ -593,10 +597,8 @@ private[remote] class Association(
log.error(cause, "{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
lazyRestart()
} else {
log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. {cause.getMessage}",
streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds)
// FIXME is this the right thing to do for outbound?
log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds)
transport.system.terminate()
}
}

View file

@ -313,7 +313,7 @@ private[remote] class Decoder(
val decoded = inEnvelopePool.acquire().init(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
localAddress, // FIXME: this is used for the "non-local recipient" check in MessageDispatcher. Is this needed anymore?
sender,
originUid,
headerBuilder.serializer,
@ -326,7 +326,7 @@ private[remote] class Decoder(
// recipient for the first message that is sent to it, best effort retry
scheduleOnce(RetryResolveRemoteDeployedRecipient(
retryResolveRemoteDeployedRecipientAttempts,
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE?
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval)
} else
push(out, decoded)
}

View file

@ -60,13 +60,13 @@ private[akka] object InboundControlJunction {
* subject to get notification of incoming control
* messages.
*/
private[akka] trait ControlMessageSubject {
private[remote] trait ControlMessageSubject {
def attach(observer: ControlMessageObserver): Future[Done]
def detach(observer: ControlMessageObserver): Unit
def stopped: Future[Done]
}
private[akka] trait ControlMessageObserver {
private[remote] trait ControlMessageObserver {
/**
* Notification of incoming control message. The message
@ -176,7 +176,7 @@ private[akka] class OutboundControlJunction(
import OutboundControlJunction._
private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
private val maxControlMessageBufferSize: Int = 1024 // FIXME config
private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize
private val buffer = new ArrayDeque[OutboundEnvelope]
override def preStart(): Unit = {

View file

@ -132,7 +132,8 @@ private[remote] class RollingEventLogSection(
recordSize: Int) {
import RollingEventLogSection._
// FIXME: check if power of two
require(entryCount > 0, "entryCount must be greater than 0")
require((entryCount & (entryCount - 1)) == 0, "entryCount must be power of two")
private[this] val LogMask: Long = entryCount - 1L
private[this] val buffers: Array[MappedResizeableBuffer] = Array.tabulate(FlightRecorder.SnapshotCount) { logId
@ -237,7 +238,8 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB
private[this] val globalSection = new MappedResizeableBuffer(fileChannel, 0, GlobalSectionSize)
// FIXME: check if power of two
require(SnapshotCount > 0, "SnapshotCount must be greater than 0")
require((SnapshotCount & (SnapshotCount - 1)) == 0, "SnapshotCount must be power of two")
private[this] val SnapshotMask = SnapshotCount - 1
private[this] val alertLogs =
new RollingEventLogSection(
@ -329,7 +331,7 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB
private def prepareRichRecord(recordBuffer: ByteBuffer, code: Int, metadata: Array[Byte]): Unit = {
recordBuffer.clear()
// FIXME: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock
// TODO: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock
recordBuffer.putLong(clock.wallClockPart)
recordBuffer.putLong(clock.highSpeedPart)
recordBuffer.putInt(code)
@ -342,7 +344,7 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB
recordBuffer.position(0)
}
// FIXME: Try to save as many bytes here as possible! We will see crazy throughput here
// TODO: Try to save as many bytes here as possible! We will see crazy throughput here
override def hiFreq(code: Long, param: Long): Unit = {
hiFreqBatchedEntries += 1
hiFreqBatchBuffer.putLong(code)

View file

@ -18,29 +18,30 @@ object FlightRecorderEvents {
val Transport_AeronErrorLogTaskStopped = 10
val Transport_MediaFileDeleted = 11
val Transport_FlightRecorderClose = 12
val Transport_SendQueueOverflow = 13
// Aeron Sink events
val AeronSink_Started = 13
val AeronSink_TaskRunnerRemoved = 14
val AeronSink_PublicationClosed = 15
val AeronSink_Stopped = 16
val AeronSink_EnvelopeGrabbed = 17
val AeronSink_EnvelopeOffered = 18
val AeronSink_GaveUpEnvelope = 19
val AeronSink_DelegateToTaskRunner = 20
val AeronSink_ReturnFromTaskRunner = 21
val AeronSink_Started = 50
val AeronSink_TaskRunnerRemoved = 51
val AeronSink_PublicationClosed = 52
val AeronSink_Stopped = 53
val AeronSink_EnvelopeGrabbed = 54
val AeronSink_EnvelopeOffered = 55
val AeronSink_GaveUpEnvelope = 56
val AeronSink_DelegateToTaskRunner = 57
val AeronSink_ReturnFromTaskRunner = 58
// Aeron Source events
val AeronSource_Started = 22
val AeronSource_Stopped = 23
val AeronSource_Received = 24
val AeronSource_DelegateToTaskRunner = 25
val AeronSource_ReturnFromTaskRunner = 26
val AeronSource_Started = 70
val AeronSource_Stopped = 71
val AeronSource_Received = 72
val AeronSource_DelegateToTaskRunner = 72
val AeronSource_ReturnFromTaskRunner = 73
// Compression events
val Compression_CompressedActorRef = 25
val Compression_AllocatedActorRefCompressionId = 26
val Compression_CompressedManifest = 27
val Compression_AllocatedManifestCompressionId = 28
val Compression_CompressedActorRef = 90
val Compression_AllocatedActorRefCompressionId = 91
val Compression_CompressedManifest = 91
val Compression_AllocatedManifestCompressionId = 92
}

View file

@ -99,6 +99,7 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue
}
override def postStop(): Unit = {
// TODO quarantine will currently always be done when control stream is terminated, see issue #21359
if (consumerQueue ne null)
consumerQueue.clear()
super.postStop()

View file

@ -29,7 +29,6 @@ import akka.actor.ActorRef
*/
private[akka] object SystemMessageDelivery {
// FIXME serialization of these messages
// FIXME ackReplyTo should not be needed
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply
@ -80,7 +79,6 @@ private[akka] class SystemMessageDelivery(
outboundContext.controlSubject.stopped.onComplete {
getAsyncCallback[Try[Done]] {
// FIXME quarantine
case Success(_) completeStage()
case Failure(cause) failStage(cause)
}.invoke
@ -88,6 +86,7 @@ private[akka] class SystemMessageDelivery(
}
override def postStop(): Unit = {
// TODO quarantine will currently always be done when control stream is terminated, see issue #21359
sendUnacknowledgedToDeadLetters()
unacknowledged.clear()
outboundContext.controlSubject.detach(this)

View file

@ -42,7 +42,9 @@ private[remote] final class InboundCompressionsImpl(
inboundContext: InboundContext,
settings: ArterySettings.Compression) extends InboundCompressions {
// FIXME we also must remove the ones that won't be used anymore - when quarantine triggers
// TODO we also must remove the ones that won't be used anymore - when quarantine triggers?
// Why is that important? Won't be naturally be removed in new advertisements since they
// are not used any more?
private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]()
private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] {
override def apply(originUid: Long): InboundActorRefCompression = {
@ -200,12 +202,6 @@ private[remote] abstract class InboundCompression[T >: Null](
lazy val log = Logging(system, getClass.getSimpleName)
// FIXME NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet,
// yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that.
// SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active)
// This is nice as it practically disables all the "build the table" work when the other side is not interested in using it.
// SOLUTION 2: We end up dropping messages when old table comes in (we do that anyway)
private[this] val state: AtomicReference[InboundCompression.State[T]] = new AtomicReference(InboundCompression.State.empty)
// TODO calibrate properly (h/w have direct relation to preciseness and max capacity)
@ -224,7 +220,7 @@ private[remote] abstract class InboundCompression[T >: Null](
* @throws UnknownCompressedIdException if given id is not known, this may indicate a bug such situation should not happen.
*/
@tailrec final def decompressInternal(incomingTableVersion: Int, idx: Int, attemptCounter: Int): OptionVal[T] = {
// effectively should never loop more than once, to avoid infinite recursion blow up eagerly
// effectively should never loop more than once, to avoid infinite recursion blow up eagerly
if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal state: ${state.get}")
val current = state.get

View file

@ -119,7 +119,7 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender
val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet
// it must not quarantine the current connection
RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1))
RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1), "test")
// the message from local to remote should reuse passive inbound connection
system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1)

View file

@ -53,7 +53,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Option[Int]): Unit = {
override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}

View file

@ -769,11 +769,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
inboundHandleProbe.expectNoMsg(1.second)
// Quarantine unrelated connection
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1))
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1), "test")
inboundHandleProbe.expectNoMsg(1.second)
// Quarantine the connection
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID))
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID), "test")
// Even though the connection is stashed it will be disassociated
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]

View file

@ -26,8 +26,6 @@ object RemoteDeathWatchSpec {
}
}
remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
# FIXME do we need the initial-system-message-delivery-timeout?
remote.initial-system-message-delivery-timeout = 3 s
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
@ -38,6 +36,9 @@ object RemoteDeathWatchSpec {
class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
import RemoteDeathWatchSpec._
system.eventStream.publish(TestEvent.Mute(
EventFilter[io.aeron.exceptions.RegistrationException]()))
val other = ActorSystem("other", ConfigFactory.parseString(s"akka.remote.artery.port=$otherPort")
.withFallback(system.settings.config))

View file

@ -54,7 +54,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Option[Int]): Unit = {
override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}

View file

@ -19,6 +19,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.util.OptionVal
import akka.actor.InternalActorRef
import akka.dispatch.ExecutionContexts
import com.typesafe.config.ConfigFactory
private[remote] class TestInboundContext(
override val localAddress: UniqueAddress,
@ -59,6 +60,9 @@ private[remote] class TestInboundContext(
protected def createAssociation(remoteAddress: Address): TestOutboundContext =
new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe)
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private[remote] class TestOutboundContext(
@ -94,6 +98,9 @@ private[remote] class TestOutboundContext(
OptionVal.None))
}
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private[remote] class TestControlMessageSubject extends ControlMessageSubject {

View file

@ -59,7 +59,6 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
}
"Outgoing compression table" must {
// FIXME this is failing, we must rethink how tables are identified and updated
"be dropped on system restart" in {
val messagesToExchange = 10
val systemATransport = RARP(system).provider.transport.asInstanceOf[ArteryTransport]

View file

@ -966,6 +966,10 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat")
),
"2.4.10" -> Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine")
)
)
}