Merge pull request #20759 from akka/wip-20715-remote-deployment-patriknw
make remote deployment work with Artery, #20715
This commit is contained in:
commit
39f6a9dcf3
13 changed files with 195 additions and 94 deletions
|
|
@ -37,13 +37,12 @@ class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
|||
class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
||||
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false))
|
||||
|
||||
// FIXME this test is failing for Artery, a DeathWatchNotification is not delivered as expected?
|
||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
|
||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
||||
// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec(
|
||||
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec(
|
||||
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec(
|
||||
new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true))
|
||||
|
||||
object AttemptSysMsgRedeliverySpec {
|
||||
class Echo extends Actor {
|
||||
|
|
|
|||
|
|
@ -82,11 +82,6 @@ abstract class RemoteRestartedQuarantinedSpec
|
|||
runOn(first) {
|
||||
val secondAddress = node(second).address
|
||||
|
||||
// FIXME this should not be needed, see issue #20566
|
||||
within(30.seconds) {
|
||||
identifyWithUid(second, "subject", 1.seconds)
|
||||
}
|
||||
|
||||
val (uid, ref) = identifyWithUid(second, "subject", 5.seconds)
|
||||
|
||||
enterBarrier("before-quarantined")
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.remote
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ VirtualPathContainer, Deploy, Props, Nobody, InternalActorRef, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated }
|
||||
|
|
@ -22,6 +23,7 @@ import akka.actor.EmptyLocalActorRef
|
|||
import akka.event.AddressTerminatedTopic
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.dispatch.sysmsg.Unwatch
|
||||
import akka.NotUsed
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -57,6 +59,7 @@ private[akka] class RemoteSystemDaemon(
|
|||
AddressTerminatedTopic(system).subscribe(this)
|
||||
|
||||
private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]]
|
||||
private val dedupDaemonMsgCreateMessages = new ConcurrentHashMap[String, NotUsed]
|
||||
|
||||
@tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean =
|
||||
parent2children.get(parent) match {
|
||||
|
|
@ -138,32 +141,41 @@ private[akka] class RemoteSystemDaemon(
|
|||
case message: DaemonMsg ⇒
|
||||
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address)
|
||||
message match {
|
||||
case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ log.debug("does not accept deployments (untrusted) for [{}]", path)
|
||||
case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒
|
||||
log.debug("does not accept deployments (untrusted) for [{}]", path)
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
path match {
|
||||
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒
|
||||
// TODO RK currently the extracted “address” is just ignored, is that okay?
|
||||
// TODO RK canonicalize path so as not to duplicate it always #1446
|
||||
val subpath = elems.drop(1)
|
||||
val p = this.path / subpath
|
||||
val childName = {
|
||||
val s = subpath.mkString("/")
|
||||
val i = s.indexOf('#')
|
||||
if (i < 0) s
|
||||
else s.substring(0, i)
|
||||
}
|
||||
val isTerminating = !terminating.whileOff {
|
||||
val parent = supervisor.asInstanceOf[InternalActorRef]
|
||||
val actor = system.provider.actorOf(system, props, parent,
|
||||
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
|
||||
addChild(childName, actor)
|
||||
actor.sendSystemMessage(Watch(actor, this))
|
||||
actor.start()
|
||||
if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
|
||||
}
|
||||
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
|
||||
case _ ⇒
|
||||
log.debug("remote path does not match path from message [{}]", message)
|
||||
// Artery sends multiple DaemonMsgCreate over several streams to preserve ordering assumptions,
|
||||
// DaemonMsgCreate for this unique path is already handled and therefore deduplicated
|
||||
if (dedupDaemonMsgCreateMessages.putIfAbsent(path, NotUsed) == null) {
|
||||
// we only need to keep the dedup info for a short period
|
||||
// this is not a real actor, so no point in scheduling message
|
||||
system.scheduler.scheduleOnce(5.seconds)(dedupDaemonMsgCreateMessages.remove(path))(system.dispatcher)
|
||||
|
||||
path match {
|
||||
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒
|
||||
// TODO RK currently the extracted “address” is just ignored, is that okay?
|
||||
// TODO RK canonicalize path so as not to duplicate it always #1446
|
||||
val subpath = elems.drop(1)
|
||||
val p = this.path / subpath
|
||||
val childName = {
|
||||
val s = subpath.mkString("/")
|
||||
val i = s.indexOf('#')
|
||||
if (i < 0) s
|
||||
else s.substring(0, i)
|
||||
}
|
||||
val isTerminating = !terminating.whileOff {
|
||||
val parent = supervisor.asInstanceOf[InternalActorRef]
|
||||
val actor = system.provider.actorOf(system, props, parent,
|
||||
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
|
||||
addChild(childName, actor)
|
||||
actor.sendSystemMessage(Watch(actor, this))
|
||||
actor.start()
|
||||
if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
|
||||
}
|
||||
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
|
||||
case _ ⇒
|
||||
log.debug("remote path does not match path from message [{}]", message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,14 +79,14 @@ import akka.util.OptionVal
|
|||
*/
|
||||
private[akka] object InboundEnvelope {
|
||||
def apply(
|
||||
recipient: InternalActorRef,
|
||||
recipient: OptionVal[InternalActorRef],
|
||||
recipientAddress: Address,
|
||||
message: AnyRef,
|
||||
senderOption: OptionVal[ActorRef],
|
||||
sender: OptionVal[ActorRef],
|
||||
originUid: Long,
|
||||
association: OptionVal[OutboundContext]): InboundEnvelope = {
|
||||
val env = new ReusableInboundEnvelope
|
||||
env.init(recipient, recipientAddress, message, senderOption, originUid, association)
|
||||
env.init(recipient, recipientAddress, message, sender, originUid, association)
|
||||
env
|
||||
}
|
||||
|
||||
|
|
@ -96,31 +96,33 @@ private[akka] object InboundEnvelope {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] trait InboundEnvelope {
|
||||
def recipient: InternalActorRef
|
||||
def recipient: OptionVal[InternalActorRef]
|
||||
def recipientAddress: Address
|
||||
def message: AnyRef
|
||||
def senderOption: OptionVal[ActorRef]
|
||||
def sender: OptionVal[ActorRef]
|
||||
def originUid: Long
|
||||
def association: OptionVal[OutboundContext]
|
||||
|
||||
def withMessage(message: AnyRef): InboundEnvelope
|
||||
|
||||
def withRecipient(ref: InternalActorRef): InboundEnvelope
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
|
||||
private var _recipient: InternalActorRef = null
|
||||
private var _recipient: OptionVal[InternalActorRef] = OptionVal.None
|
||||
private var _recipientAddress: Address = null
|
||||
private var _message: AnyRef = null
|
||||
private var _senderOption: OptionVal[ActorRef] = OptionVal.None
|
||||
private var _sender: OptionVal[ActorRef] = OptionVal.None
|
||||
private var _originUid: Long = 0L
|
||||
private var _association: OptionVal[OutboundContext] = OptionVal.None
|
||||
|
||||
override def recipient: InternalActorRef = _recipient
|
||||
override def recipient: OptionVal[InternalActorRef] = _recipient
|
||||
override def recipientAddress: Address = _recipientAddress
|
||||
override def message: AnyRef = _message
|
||||
override def senderOption: OptionVal[ActorRef] = _senderOption
|
||||
override def sender: OptionVal[ActorRef] = _sender
|
||||
override def originUid: Long = _originUid
|
||||
override def association: OptionVal[OutboundContext] = _association
|
||||
|
||||
|
|
@ -129,32 +131,37 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
|
|||
this
|
||||
}
|
||||
|
||||
def withRecipient(ref: InternalActorRef): InboundEnvelope = {
|
||||
_recipient = OptionVal(ref)
|
||||
this
|
||||
}
|
||||
|
||||
def clear(): Unit = {
|
||||
_recipient = null
|
||||
_recipient = OptionVal.None
|
||||
_recipientAddress = null
|
||||
_message = null
|
||||
_senderOption = OptionVal.None
|
||||
_sender = OptionVal.None
|
||||
_originUid = 0L
|
||||
_association = OptionVal.None
|
||||
}
|
||||
|
||||
def init(
|
||||
recipient: InternalActorRef,
|
||||
recipient: OptionVal[InternalActorRef],
|
||||
recipientAddress: Address,
|
||||
message: AnyRef,
|
||||
senderOption: OptionVal[ActorRef],
|
||||
sender: OptionVal[ActorRef],
|
||||
originUid: Long,
|
||||
association: OptionVal[OutboundContext]): Unit = {
|
||||
_recipient = recipient
|
||||
_recipientAddress = recipientAddress
|
||||
_message = message
|
||||
_senderOption = senderOption
|
||||
_sender = sender
|
||||
_originUid = originUid
|
||||
_association = association
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
s"InboundEnvelope($recipient, $recipientAddress, $message, $senderOption, $originUid, $association)"
|
||||
s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -647,7 +654,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
override def sendControl(to: Address, message: ControlMessage) =
|
||||
association(to).sendControl(message)
|
||||
|
||||
override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
override def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
val cached = recipient.cachedAssociation
|
||||
|
||||
val a =
|
||||
|
|
@ -658,7 +665,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
a2
|
||||
}
|
||||
|
||||
a.send(message, senderOption, recipient)
|
||||
a.send(message, sender, recipient)
|
||||
}
|
||||
|
||||
override def association(remoteAddress: Address): Association =
|
||||
|
|
@ -719,7 +726,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
flightRecorder.createEventSink()))
|
||||
|
||||
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒
|
||||
messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption)
|
||||
messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender)
|
||||
inboundEnvelopePool.release(m)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import akka.util.{ Unsafe, WildcardTree }
|
|||
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
|
||||
import akka.util.OptionVal
|
||||
import akka.remote.QuarantinedEvent
|
||||
import akka.remote.DaemonMsgCreate
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -159,20 +160,31 @@ private[remote] class Association(
|
|||
override def sendControl(message: ControlMessage): Unit =
|
||||
outboundControlIngress.sendControlMessage(message)
|
||||
|
||||
def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
|
||||
// FIXME where is that ActorSelectionMessage check in old remoting?
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
|
||||
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
|
||||
message match {
|
||||
case _: SystemMessage | ClearSystemMessageDelivery ⇒
|
||||
val send = Send(message, senderOption, recipient, None)
|
||||
val send = Send(message, sender, recipient, None)
|
||||
if (!controlQueue.offer(send)) {
|
||||
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
|
||||
transport.system.deadLetters ! send
|
||||
}
|
||||
case _: DaemonMsgCreate ⇒
|
||||
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
|
||||
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
|
||||
// It must also be sent over the ordinary message stream so that it arrives (and creates the
|
||||
// destination) before the first ordinary message arrives.
|
||||
val send1 = Send(message, sender, recipient, None)
|
||||
if (!controlQueue.offer(send1))
|
||||
transport.system.deadLetters ! send1
|
||||
val send2 = Send(message, sender, recipient, None)
|
||||
if (!queue.offer(send2))
|
||||
transport.system.deadLetters ! send2
|
||||
case _ ⇒
|
||||
val send = Send(message, senderOption, recipient, None)
|
||||
val send = Send(message, sender, recipient, None)
|
||||
val offerOk =
|
||||
if (largeMessageChannelEnabled && isLargeMessageDestination(recipient))
|
||||
largeQueue.offer(send)
|
||||
|
|
|
|||
|
|
@ -99,6 +99,9 @@ sealed trait HeaderBuilder {
|
|||
def setNoSender(): Unit
|
||||
def isNoSender: Boolean
|
||||
|
||||
def setNoRecipient(): Unit
|
||||
def isNoRecipient: Boolean
|
||||
|
||||
def recipientActorRef_=(ref: String): Unit
|
||||
def recipientActorRef: String
|
||||
|
||||
|
|
@ -147,6 +150,14 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr
|
|||
}
|
||||
}
|
||||
|
||||
def setNoRecipient(): Unit = {
|
||||
_recipientActorRef = null
|
||||
_recipientActorRefIdx = EnvelopeBuffer.DeadLettersCode
|
||||
}
|
||||
|
||||
def isNoRecipient: Boolean =
|
||||
(_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode
|
||||
|
||||
def recipientActorRef_=(ref: String): Unit = {
|
||||
_recipientActorRef = ref
|
||||
_recipientActorRefIdx = compressionTable.compressActorRef(ref)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ ActorRef, InternalActorRef }
|
||||
import akka.actor.ActorSystem
|
||||
|
|
@ -11,9 +12,13 @@ import akka.serialization.{ Serialization, SerializationExtension }
|
|||
import akka.stream._
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
import akka.util.OptionVal
|
||||
import akka.actor.EmptyLocalActorRef
|
||||
import akka.stream.stage.TimerGraphStageLogic
|
||||
|
||||
// TODO: Long UID
|
||||
class Encoder(
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[remote] class Encoder(
|
||||
uniqueLocalAddress: UniqueAddress,
|
||||
system: ActorSystem,
|
||||
compressionTable: LiteralCompressionTable,
|
||||
|
|
@ -104,7 +109,20 @@ class Encoder(
|
|||
}
|
||||
}
|
||||
|
||||
class Decoder(
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[remote] object Decoder {
|
||||
private final case class RetryResolveRemoteDeployedRecipient(
|
||||
attemptsLeft: Int,
|
||||
recipientPath: String,
|
||||
inboundEnvelope: InboundEnvelope)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[remote] class Decoder(
|
||||
inboundContext: InboundContext,
|
||||
system: ExtendedActorSystem,
|
||||
resolveActorRefWithLocalAddress: String ⇒ InternalActorRef,
|
||||
|
|
@ -116,7 +134,8 @@ class Decoder(
|
|||
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
||||
import Decoder.RetryResolveRemoteDeployedRecipient
|
||||
private val localAddress = inboundContext.localAddress.address
|
||||
private val headerBuilder = HeaderBuilder(compressionTable)
|
||||
private val serialization = SerializationExtension(system)
|
||||
|
|
@ -124,6 +143,9 @@ class Decoder(
|
|||
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
|
||||
private val senderCache = new java.util.HashMap[String, ActorRef]
|
||||
|
||||
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
||||
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
||||
|
||||
override protected def logSource = classOf[Decoder]
|
||||
|
||||
override def onPush(): Unit = {
|
||||
|
|
@ -133,18 +155,8 @@ class Decoder(
|
|||
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
|
||||
// in case of compression is enabled
|
||||
// FIXME: Is localAddress really needed?
|
||||
val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match {
|
||||
case null ⇒
|
||||
val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef)
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(headerBuilder.recipientActorRef, ref)
|
||||
ref
|
||||
case ref ⇒ ref
|
||||
}
|
||||
|
||||
val senderOption =
|
||||
val sender =
|
||||
if (headerBuilder.isNoSender)
|
||||
OptionVal.None
|
||||
else {
|
||||
|
|
@ -160,6 +172,12 @@ class Decoder(
|
|||
}
|
||||
}
|
||||
|
||||
val recipient =
|
||||
if (headerBuilder.isNoRecipient)
|
||||
OptionVal.None
|
||||
else
|
||||
resolveRecipient(headerBuilder.recipientActorRef)
|
||||
|
||||
val originUid = headerBuilder.uid
|
||||
val association = inboundContext.association(originUid)
|
||||
|
||||
|
|
@ -172,11 +190,18 @@ class Decoder(
|
|||
recipient,
|
||||
localAddress, // FIXME: Is this needed anymore? What should we do here?
|
||||
deserializedMessage,
|
||||
senderOption,
|
||||
sender,
|
||||
originUid,
|
||||
association)
|
||||
|
||||
push(out, decoded)
|
||||
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
||||
// the remote deployed actor might not be created yet when resolving the
|
||||
// recipient for the first message that is sent to it, best effort retry
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
retryResolveRemoteDeployedRecipientAttempts,
|
||||
headerBuilder.recipientActorRef, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||
} else
|
||||
push(out, decoded)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
log.warning(
|
||||
|
|
@ -188,8 +213,56 @@ class Decoder(
|
|||
}
|
||||
}
|
||||
|
||||
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
|
||||
recipientCache.get(path) match {
|
||||
case null ⇒
|
||||
def addToCache(resolved: InternalActorRef): Unit = {
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(path, resolved)
|
||||
}
|
||||
|
||||
resolveActorRefWithLocalAddress(path) match {
|
||||
case empty: EmptyLocalActorRef ⇒
|
||||
val pathElements = empty.path.elements
|
||||
if (pathElements.nonEmpty && pathElements.head == "remote")
|
||||
OptionVal.None
|
||||
else {
|
||||
addToCache(empty)
|
||||
OptionVal(empty)
|
||||
}
|
||||
case ref ⇒
|
||||
addToCache(ref)
|
||||
OptionVal(ref)
|
||||
}
|
||||
case ref ⇒ OptionVal(ref)
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = {
|
||||
timerKey match {
|
||||
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒
|
||||
resolveRecipient(recipientPath) match {
|
||||
case OptionVal.None ⇒
|
||||
if (attemptsLeft > 0)
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
attemptsLeft - 1,
|
||||
headerBuilder.recipientActorRef, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
||||
else {
|
||||
val recipient = resolveActorRefWithLocalAddress(recipientPath)
|
||||
// only retry for the first message
|
||||
recipientCache.put(recipientPath, recipient)
|
||||
push(out, inboundEnvelope.withRecipient(recipient))
|
||||
}
|
||||
case OptionVal.Some(recipient) ⇒
|
||||
push(out, inboundEnvelope.withRecipient(recipient))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
|
|||
"be emitted via side channel" in {
|
||||
val observerProbe = TestProbe()
|
||||
val inboundContext = new TestInboundContext(localAddress = addressB)
|
||||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
val recipient = OptionVal.None // not used
|
||||
|
||||
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, OptionVal.None))
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2)
|
||||
|
||||
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
|
||||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
val recipient = OptionVal.None // not used
|
||||
TestSource.probe[AnyRef]
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid,
|
||||
inboundContext.association(addressA.uid)))
|
||||
|
|
|
|||
|
|
@ -59,8 +59,7 @@ class RemoteDeploymentSpec extends AkkaSpec("""
|
|||
|
||||
"Remoting" must {
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"create and supervise children on remote node" ignore {
|
||||
"create and supervise children on remote node" in {
|
||||
val senderProbe = TestProbe()(masterSystem)
|
||||
val r = masterSystem.actorOf(Props[Echo1], "blub")
|
||||
r.path.toString should ===(s"artery://${system.name}@localhost:${port}/remote/artery/${masterSystem.name}@localhost:${masterPort}/user/blub")
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ object RemoteRouterSpec {
|
|||
}
|
||||
|
||||
class RemoteRouterSpec extends AkkaSpec("""
|
||||
akka.loglevel=DEBUG
|
||||
akka.actor.provider = remote
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.hostname = localhost
|
||||
|
|
@ -99,8 +98,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
|
||||
"A Remote Router" must {
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"deploy its children on remote host driven by configuration" ignore {
|
||||
"deploy its children on remote host driven by configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub")
|
||||
val replies = collectRouteePaths(probe, router, 5)
|
||||
|
|
@ -112,8 +110,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"deploy its children on remote host driven by programatic definition" ignore {
|
||||
"deploy its children on remote host driven by programatic definition" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(new RemoteRouterConfig(
|
||||
RoundRobinPool(2),
|
||||
|
|
@ -126,8 +123,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"deploy dynamic resizable number of children on remote host driven by configuration" ignore {
|
||||
"deploy dynamic resizable number of children on remote host driven by configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "elastic-blub")
|
||||
val replies = collectRouteePaths(probe, router, 5000)
|
||||
|
|
@ -152,8 +148,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"deploy remote routers based on explicit deployment" ignore {
|
||||
"deploy remote routers based on explicit deployment" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "remote-blub2")
|
||||
|
|
@ -168,8 +163,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"let remote deployment be overridden by local configuration" ignore {
|
||||
"let remote deployment be overridden by local configuration" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps)
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "local-blub")
|
||||
|
|
@ -214,8 +208,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
masterSystem.stop(router)
|
||||
}
|
||||
|
||||
// FIXME this test is failing with Artery
|
||||
"set supplied supervisorStrategy" ignore {
|
||||
"set supplied supervisorStrategy" in {
|
||||
val probe = TestProbe()(masterSystem)
|
||||
val escalator = OneForOneStrategy() {
|
||||
case e ⇒ probe.ref ! e; SupervisorStrategy.Escalate
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
|
|||
}
|
||||
|
||||
private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = {
|
||||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
val recipient = OptionVal.None // not used
|
||||
Flow[Send]
|
||||
.map {
|
||||
case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ private[akka] class TestOutboundContext(
|
|||
|
||||
override def sendControl(message: ControlMessage) = {
|
||||
controlProbe.foreach(_ ! message)
|
||||
controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, OptionVal.None, localAddress.uid,
|
||||
controlSubject.sendControl(InboundEnvelope(OptionVal.None, remoteAddress, message, OptionVal.None, localAddress.uid,
|
||||
OptionVal.None))
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue