From 5c234940c60b1f862f58ed1bb683798b0a4b8cae Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 9 Jun 2016 09:16:44 +0200 Subject: [PATCH] make remote deployment work with Artery, #20715 There were two related problems with remote deployment when using Artery. * DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages. It must also be sent over the ordinary message stream so that it arrives (and creates the destination) before the first ordinary message arrives. * The first point solves the creation of the remote deployed actor but it's not enough. Resolve of the recipient actor ref may still happen before the actor is created. This is solved by retrying the resolve for the first message of a remote deployed actor. --- .../remote/AttemptSysMsgRedeliverySpec.scala | 13 +-- .../RemoteRestartedQuarantinedSpec.scala | 5 - .../main/scala/akka/remote/RemoteDaemon.scala | 62 ++++++---- .../akka/remote/artery/ArteryTransport.scala | 43 ++++--- .../akka/remote/artery/Association.scala | 18 ++- .../scala/akka/remote/artery/BufferPool.scala | 11 ++ .../scala/akka/remote/artery/Codecs.scala | 107 +++++++++++++++--- .../artery/InboundControlJunctionSpec.scala | 2 +- .../remote/artery/InboundHandshakeSpec.scala | 2 +- .../remote/artery/RemoteDeploymentSpec.scala | 3 +- .../akka/remote/artery/RemoteRouterSpec.scala | 19 +--- .../artery/SystemMessageDeliverySpec.scala | 2 +- .../akka/remote/artery/TestContext.scala | 2 +- 13 files changed, 195 insertions(+), 94 deletions(-) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala index f0a5327991..35badd1cc0 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -37,13 +37,12 @@ class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false)) -// FIXME this test is failing for Artery, a DeathWatchNotification is not delivered as expected? -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) object AttemptSysMsgRedeliverySpec { class Echo extends Actor { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index 980aa51908..f34fae4742 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -82,11 +82,6 @@ abstract class RemoteRestartedQuarantinedSpec runOn(first) { val secondAddress = node(second).address - // FIXME this should not be needed, see issue #20566 - within(30.seconds) { - identifyWithUid(second, "subject", 1.seconds) - } - val (uid, ref) = identifyWithUid(second, "subject", 5.seconds) enterBarrier("before-quarantined") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 618b71aa3e..847821dbcc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -4,6 +4,7 @@ package akka.remote +import scala.concurrent.duration._ import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ VirtualPathContainer, Deploy, Props, Nobody, InternalActorRef, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } @@ -22,6 +23,7 @@ import akka.actor.EmptyLocalActorRef import akka.event.AddressTerminatedTopic import java.util.concurrent.ConcurrentHashMap import akka.dispatch.sysmsg.Unwatch +import akka.NotUsed /** * INTERNAL API @@ -57,6 +59,7 @@ private[akka] class RemoteSystemDaemon( AddressTerminatedTopic(system).subscribe(this) private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]] + private val dedupDaemonMsgCreateMessages = new ConcurrentHashMap[String, NotUsed] @tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean = parent2children.get(parent) match { @@ -138,32 +141,41 @@ private[akka] class RemoteSystemDaemon( case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { - case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ log.debug("does not accept deployments (untrusted) for [{}]", path) + case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ + log.debug("does not accept deployments (untrusted) for [{}]", path) case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ - path match { - case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ - // TODO RK currently the extracted “address” is just ignored, is that okay? - // TODO RK canonicalize path so as not to duplicate it always #1446 - val subpath = elems.drop(1) - val p = this.path / subpath - val childName = { - val s = subpath.mkString("/") - val i = s.indexOf('#') - if (i < 0) s - else s.substring(0, i) - } - val isTerminating = !terminating.whileOff { - val parent = supervisor.asInstanceOf[InternalActorRef] - val actor = system.provider.actorOf(system, props, parent, - p, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(childName, actor) - actor.sendSystemMessage(Watch(actor, this)) - actor.start() - if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this)) - } - if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) - case _ ⇒ - log.debug("remote path does not match path from message [{}]", message) + // Artery sends multiple DaemonMsgCreate over several streams to preserve ordering assumptions, + // DaemonMsgCreate for this unique path is already handled and therefore deduplicated + if (dedupDaemonMsgCreateMessages.putIfAbsent(path, NotUsed) == null) { + // we only need to keep the dedup info for a short period + // this is not a real actor, so no point in scheduling message + system.scheduler.scheduleOnce(5.seconds)(dedupDaemonMsgCreateMessages.remove(path))(system.dispatcher) + + path match { + case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + // TODO RK currently the extracted “address” is just ignored, is that okay? + // TODO RK canonicalize path so as not to duplicate it always #1446 + val subpath = elems.drop(1) + val p = this.path / subpath + val childName = { + val s = subpath.mkString("/") + val i = s.indexOf('#') + if (i < 0) s + else s.substring(0, i) + } + val isTerminating = !terminating.whileOff { + val parent = supervisor.asInstanceOf[InternalActorRef] + val actor = system.provider.actorOf(system, props, parent, + p, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(childName, actor) + actor.sendSystemMessage(Watch(actor, this)) + actor.start() + if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this)) + } + if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) + case _ ⇒ + log.debug("remote path does not match path from message [{}]", message) + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 9edaa01bf2..fdeafef12d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -79,14 +79,14 @@ import akka.util.OptionVal */ private[akka] object InboundEnvelope { def apply( - recipient: InternalActorRef, + recipient: OptionVal[InternalActorRef], recipientAddress: Address, message: AnyRef, - senderOption: OptionVal[ActorRef], + sender: OptionVal[ActorRef], originUid: Long, association: OptionVal[OutboundContext]): InboundEnvelope = { val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, message, senderOption, originUid, association) + env.init(recipient, recipientAddress, message, sender, originUid, association) env } @@ -96,31 +96,33 @@ private[akka] object InboundEnvelope { * INTERNAL API */ private[akka] trait InboundEnvelope { - def recipient: InternalActorRef + def recipient: OptionVal[InternalActorRef] def recipientAddress: Address def message: AnyRef - def senderOption: OptionVal[ActorRef] + def sender: OptionVal[ActorRef] def originUid: Long def association: OptionVal[OutboundContext] def withMessage(message: AnyRef): InboundEnvelope + + def withRecipient(ref: InternalActorRef): InboundEnvelope } /** * INTERNAL API */ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { - private var _recipient: InternalActorRef = null + private var _recipient: OptionVal[InternalActorRef] = OptionVal.None private var _recipientAddress: Address = null private var _message: AnyRef = null - private var _senderOption: OptionVal[ActorRef] = OptionVal.None + private var _sender: OptionVal[ActorRef] = OptionVal.None private var _originUid: Long = 0L private var _association: OptionVal[OutboundContext] = OptionVal.None - override def recipient: InternalActorRef = _recipient + override def recipient: OptionVal[InternalActorRef] = _recipient override def recipientAddress: Address = _recipientAddress override def message: AnyRef = _message - override def senderOption: OptionVal[ActorRef] = _senderOption + override def sender: OptionVal[ActorRef] = _sender override def originUid: Long = _originUid override def association: OptionVal[OutboundContext] = _association @@ -129,32 +131,37 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { this } + def withRecipient(ref: InternalActorRef): InboundEnvelope = { + _recipient = OptionVal(ref) + this + } + def clear(): Unit = { - _recipient = null + _recipient = OptionVal.None _recipientAddress = null _message = null - _senderOption = OptionVal.None + _sender = OptionVal.None _originUid = 0L _association = OptionVal.None } def init( - recipient: InternalActorRef, + recipient: OptionVal[InternalActorRef], recipientAddress: Address, message: AnyRef, - senderOption: OptionVal[ActorRef], + sender: OptionVal[ActorRef], originUid: Long, association: OptionVal[OutboundContext]): Unit = { _recipient = recipient _recipientAddress = recipientAddress _message = message - _senderOption = senderOption + _sender = sender _originUid = originUid _association = association } override def toString: String = - s"InboundEnvelope($recipient, $recipientAddress, $message, $senderOption, $originUid, $association)" + s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)" } /** @@ -647,7 +654,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def sendControl(to: Address, message: ControlMessage) = association(to).sendControl(message) - override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { + override def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation val a = @@ -658,7 +665,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R a2 } - a.send(message, senderOption, recipient) + a.send(message, sender, recipient) } override def association(remoteAddress: Address): Association = @@ -719,7 +726,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R flightRecorder.createEventSink())) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ - messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) + messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) inboundEnvelopePool.release(m) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index ca90c9a366..c6d27f4c85 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -37,6 +37,7 @@ import akka.util.{ Unsafe, WildcardTree } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal import akka.remote.QuarantinedEvent +import akka.remote.DaemonMsgCreate /** * INTERNAL API @@ -159,20 +160,31 @@ private[remote] class Association( override def sendControl(message: ControlMessage): Unit = outboundControlIngress.sendControlMessage(message) - def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { + def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage | ClearSystemMessageDelivery ⇒ - val send = Send(message, senderOption, recipient, None) + val send = Send(message, sender, recipient, None) if (!controlQueue.offer(send)) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") transport.system.deadLetters ! send } + case _: DaemonMsgCreate ⇒ + // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because + // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages. + // It must also be sent over the ordinary message stream so that it arrives (and creates the + // destination) before the first ordinary message arrives. + val send1 = Send(message, sender, recipient, None) + if (!controlQueue.offer(send1)) + transport.system.deadLetters ! send1 + val send2 = Send(message, sender, recipient, None) + if (!queue.offer(send2)) + transport.system.deadLetters ! send2 case _ ⇒ - val send = Send(message, senderOption, recipient, None) + val send = Send(message, sender, recipient, None) val offerOk = if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) largeQueue.offer(send) diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 8842d548b6..b649fd71ae 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -99,6 +99,9 @@ sealed trait HeaderBuilder { def setNoSender(): Unit def isNoSender: Boolean + def setNoRecipient(): Unit + def isNoRecipient: Boolean + def recipientActorRef_=(ref: String): Unit def recipientActorRef: String @@ -147,6 +150,14 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr } } + def setNoRecipient(): Unit = { + _recipientActorRef = null + _recipientActorRefIdx = EnvelopeBuffer.DeadLettersCode + } + + def isNoRecipient: Boolean = + (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode + def recipientActorRef_=(ref: String): Unit = { _recipientActorRef = ref _recipientActorRefIdx = compressionTable.compressActorRef(ref) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index db77bc75ae..614a3f92fa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,5 +1,6 @@ package akka.remote.artery +import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor.{ ActorRef, InternalActorRef } import akka.actor.ActorSystem @@ -11,9 +12,13 @@ import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.OptionVal +import akka.actor.EmptyLocalActorRef +import akka.stream.stage.TimerGraphStageLogic -// TODO: Long UID -class Encoder( +/** + * INTERNAL API + */ +private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, compressionTable: LiteralCompressionTable, @@ -104,7 +109,20 @@ class Encoder( } } -class Decoder( +/** + * INTERNAL API + */ +private[remote] object Decoder { + private final case class RetryResolveRemoteDeployedRecipient( + attemptsLeft: Int, + recipientPath: String, + inboundEnvelope: InboundEnvelope) +} + +/** + * INTERNAL API + */ +private[remote] class Decoder( inboundContext: InboundContext, system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, @@ -116,7 +134,8 @@ class Decoder( val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address private val headerBuilder = HeaderBuilder(compressionTable) private val serialization = SerializationExtension(system) @@ -124,6 +143,9 @@ class Decoder( private val recipientCache = new java.util.HashMap[String, InternalActorRef] private val senderCache = new java.util.HashMap[String, ActorRef] + private val retryResolveRemoteDeployedRecipientInterval = 50.millis + private val retryResolveRemoteDeployedRecipientAttempts = 20 + override protected def logSource = classOf[Decoder] override def onPush(): Unit = { @@ -133,18 +155,8 @@ class Decoder( // FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances // in case of compression is enabled // FIXME: Is localAddress really needed? - val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match { - case null ⇒ - val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef) - // FIXME we might need an efficient LRU cache, or replaced by compression table - if (recipientCache.size() >= 1000) - recipientCache.clear() - recipientCache.put(headerBuilder.recipientActorRef, ref) - ref - case ref ⇒ ref - } - val senderOption = + val sender = if (headerBuilder.isNoSender) OptionVal.None else { @@ -160,6 +172,12 @@ class Decoder( } } + val recipient = + if (headerBuilder.isNoRecipient) + OptionVal.None + else + resolveRecipient(headerBuilder.recipientActorRef) + val originUid = headerBuilder.uid val association = inboundContext.association(originUid) @@ -172,11 +190,18 @@ class Decoder( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, - senderOption, + sender, originUid, association) - push(out, decoded) + if (recipient.isEmpty && !headerBuilder.isNoRecipient) { + // the remote deployed actor might not be created yet when resolving the + // recipient for the first message that is sent to it, best effort retry + scheduleOnce(RetryResolveRemoteDeployedRecipient( + retryResolveRemoteDeployedRecipientAttempts, + headerBuilder.recipientActorRef, decoded), retryResolveRemoteDeployedRecipientInterval) + } else + push(out, decoded) } catch { case NonFatal(e) ⇒ log.warning( @@ -188,8 +213,56 @@ class Decoder( } } + private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { + recipientCache.get(path) match { + case null ⇒ + def addToCache(resolved: InternalActorRef): Unit = { + // FIXME we might need an efficient LRU cache, or replaced by compression table + if (recipientCache.size() >= 1000) + recipientCache.clear() + recipientCache.put(path, resolved) + } + + resolveActorRefWithLocalAddress(path) match { + case empty: EmptyLocalActorRef ⇒ + val pathElements = empty.path.elements + if (pathElements.nonEmpty && pathElements.head == "remote") + OptionVal.None + else { + addToCache(empty) + OptionVal(empty) + } + case ref ⇒ + addToCache(ref) + OptionVal(ref) + } + case ref ⇒ OptionVal(ref) + } + } + override def onPull(): Unit = pull(in) + override protected def onTimer(timerKey: Any): Unit = { + timerKey match { + case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒ + resolveRecipient(recipientPath) match { + case OptionVal.None ⇒ + if (attemptsLeft > 0) + scheduleOnce(RetryResolveRemoteDeployedRecipient( + attemptsLeft - 1, + headerBuilder.recipientActorRef, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) + else { + val recipient = resolveActorRefWithLocalAddress(recipientPath) + // only retry for the first message + recipientCache.put(recipientPath, recipient) + push(out, inboundEnvelope.withRecipient(recipient)) + } + case OptionVal.Some(recipient) ⇒ + push(out, inboundEnvelope.withRecipient(recipient)) + } + } + } + setHandlers(in, out, this) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 5afbdcec07..203169817c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -39,7 +39,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { "be emitted via side channel" in { val observerProbe = TestProbe() val inboundContext = new TestInboundContext(localAddress = addressB) - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, OptionVal.None)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index fb369d4bd4..ac7b62529b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -39,7 +39,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2) private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, inboundContext.association(addressA.uid))) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala index c314cd5e0a..004547b081 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala @@ -59,8 +59,7 @@ class RemoteDeploymentSpec extends AkkaSpec(""" "Remoting" must { - // FIXME this test is failing with Artery - "create and supervise children on remote node" ignore { + "create and supervise children on remote node" in { val senderProbe = TestProbe()(masterSystem) val r = masterSystem.actorOf(Props[Echo1], "blub") r.path.toString should ===(s"artery://${system.name}@localhost:${port}/remote/artery/${masterSystem.name}@localhost:${masterPort}/user/blub") diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index 6ce9ba3004..4d3380d03d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -22,7 +22,6 @@ object RemoteRouterSpec { } class RemoteRouterSpec extends AkkaSpec(""" - akka.loglevel=DEBUG akka.actor.provider = remote akka.remote.artery.enabled = on akka.remote.artery.hostname = localhost @@ -99,8 +98,7 @@ class RemoteRouterSpec extends AkkaSpec(""" "A Remote Router" must { - // FIXME this test is failing with Artery - "deploy its children on remote host driven by configuration" ignore { + "deploy its children on remote host driven by configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub") val replies = collectRouteePaths(probe, router, 5) @@ -112,8 +110,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy its children on remote host driven by programatic definition" ignore { + "deploy its children on remote host driven by programatic definition" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(new RemoteRouterConfig( RoundRobinPool(2), @@ -126,8 +123,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy dynamic resizable number of children on remote host driven by configuration" ignore { + "deploy dynamic resizable number of children on remote host driven by configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "elastic-blub") val replies = collectRouteePaths(probe, router, 5000) @@ -152,8 +148,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy remote routers based on explicit deployment" ignore { + "deploy remote routers based on explicit deployment" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps) .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "remote-blub2") @@ -168,8 +163,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "let remote deployment be overridden by local configuration" ignore { + "let remote deployment be overridden by local configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps) .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "local-blub") @@ -214,8 +208,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "set supplied supervisorStrategy" ignore { + "set supplied supervisorStrategy" in { val probe = TestProbe()(masterSystem) val escalator = OneForOneStrategy() { case e ⇒ probe.ref ! e; SupervisorStrategy.Escalate diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index b964a1c741..36b8340a4b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -73,7 +73,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi } private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = { - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used Flow[Send] .map { case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 94763cda5c..34371d7673 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -85,7 +85,7 @@ private[akka] class TestOutboundContext( override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) - controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, OptionVal.None, localAddress.uid, + controlSubject.sendControl(InboundEnvelope(OptionVal.None, remoteAddress, message, OptionVal.None, localAddress.uid, OptionVal.None)) }