diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala index f0a5327991..35badd1cc0 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -37,13 +37,12 @@ class AttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( class AttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( new AttemptSysMsgRedeliveryMultiJvmSpec(artery = false)) -// FIXME this test is failing for Artery, a DeathWatchNotification is not delivered as expected? -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) -//class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( -// new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode1 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode2 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) +class ArteryAttemptSysMsgRedeliveryMultiJvmNode3 extends AttemptSysMsgRedeliverySpec( + new AttemptSysMsgRedeliveryMultiJvmSpec(artery = true)) object AttemptSysMsgRedeliverySpec { class Echo extends Actor { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index 980aa51908..f34fae4742 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -82,11 +82,6 @@ abstract class RemoteRestartedQuarantinedSpec runOn(first) { val secondAddress = node(second).address - // FIXME this should not be needed, see issue #20566 - within(30.seconds) { - identifyWithUid(second, "subject", 1.seconds) - } - val (uid, ref) = identifyWithUid(second, "subject", 5.seconds) enterBarrier("before-quarantined") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 618b71aa3e..847821dbcc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -4,6 +4,7 @@ package akka.remote +import scala.concurrent.duration._ import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ VirtualPathContainer, Deploy, Props, Nobody, InternalActorRef, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } @@ -22,6 +23,7 @@ import akka.actor.EmptyLocalActorRef import akka.event.AddressTerminatedTopic import java.util.concurrent.ConcurrentHashMap import akka.dispatch.sysmsg.Unwatch +import akka.NotUsed /** * INTERNAL API @@ -57,6 +59,7 @@ private[akka] class RemoteSystemDaemon( AddressTerminatedTopic(system).subscribe(this) private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]] + private val dedupDaemonMsgCreateMessages = new ConcurrentHashMap[String, NotUsed] @tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean = parent2children.get(parent) match { @@ -138,32 +141,41 @@ private[akka] class RemoteSystemDaemon( case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { - case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ log.debug("does not accept deployments (untrusted) for [{}]", path) + case DaemonMsgCreate(_, _, path, _) if untrustedMode ⇒ + log.debug("does not accept deployments (untrusted) for [{}]", path) case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ - path match { - case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ - // TODO RK currently the extracted “address” is just ignored, is that okay? - // TODO RK canonicalize path so as not to duplicate it always #1446 - val subpath = elems.drop(1) - val p = this.path / subpath - val childName = { - val s = subpath.mkString("/") - val i = s.indexOf('#') - if (i < 0) s - else s.substring(0, i) - } - val isTerminating = !terminating.whileOff { - val parent = supervisor.asInstanceOf[InternalActorRef] - val actor = system.provider.actorOf(system, props, parent, - p, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(childName, actor) - actor.sendSystemMessage(Watch(actor, this)) - actor.start() - if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this)) - } - if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) - case _ ⇒ - log.debug("remote path does not match path from message [{}]", message) + // Artery sends multiple DaemonMsgCreate over several streams to preserve ordering assumptions, + // DaemonMsgCreate for this unique path is already handled and therefore deduplicated + if (dedupDaemonMsgCreateMessages.putIfAbsent(path, NotUsed) == null) { + // we only need to keep the dedup info for a short period + // this is not a real actor, so no point in scheduling message + system.scheduler.scheduleOnce(5.seconds)(dedupDaemonMsgCreateMessages.remove(path))(system.dispatcher) + + path match { + case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + // TODO RK currently the extracted “address” is just ignored, is that okay? + // TODO RK canonicalize path so as not to duplicate it always #1446 + val subpath = elems.drop(1) + val p = this.path / subpath + val childName = { + val s = subpath.mkString("/") + val i = s.indexOf('#') + if (i < 0) s + else s.substring(0, i) + } + val isTerminating = !terminating.whileOff { + val parent = supervisor.asInstanceOf[InternalActorRef] + val actor = system.provider.actorOf(system, props, parent, + p, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(childName, actor) + actor.sendSystemMessage(Watch(actor, this)) + actor.start() + if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this)) + } + if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) + case _ ⇒ + log.debug("remote path does not match path from message [{}]", message) + } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 9edaa01bf2..fdeafef12d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -79,14 +79,14 @@ import akka.util.OptionVal */ private[akka] object InboundEnvelope { def apply( - recipient: InternalActorRef, + recipient: OptionVal[InternalActorRef], recipientAddress: Address, message: AnyRef, - senderOption: OptionVal[ActorRef], + sender: OptionVal[ActorRef], originUid: Long, association: OptionVal[OutboundContext]): InboundEnvelope = { val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, message, senderOption, originUid, association) + env.init(recipient, recipientAddress, message, sender, originUid, association) env } @@ -96,31 +96,33 @@ private[akka] object InboundEnvelope { * INTERNAL API */ private[akka] trait InboundEnvelope { - def recipient: InternalActorRef + def recipient: OptionVal[InternalActorRef] def recipientAddress: Address def message: AnyRef - def senderOption: OptionVal[ActorRef] + def sender: OptionVal[ActorRef] def originUid: Long def association: OptionVal[OutboundContext] def withMessage(message: AnyRef): InboundEnvelope + + def withRecipient(ref: InternalActorRef): InboundEnvelope } /** * INTERNAL API */ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { - private var _recipient: InternalActorRef = null + private var _recipient: OptionVal[InternalActorRef] = OptionVal.None private var _recipientAddress: Address = null private var _message: AnyRef = null - private var _senderOption: OptionVal[ActorRef] = OptionVal.None + private var _sender: OptionVal[ActorRef] = OptionVal.None private var _originUid: Long = 0L private var _association: OptionVal[OutboundContext] = OptionVal.None - override def recipient: InternalActorRef = _recipient + override def recipient: OptionVal[InternalActorRef] = _recipient override def recipientAddress: Address = _recipientAddress override def message: AnyRef = _message - override def senderOption: OptionVal[ActorRef] = _senderOption + override def sender: OptionVal[ActorRef] = _sender override def originUid: Long = _originUid override def association: OptionVal[OutboundContext] = _association @@ -129,32 +131,37 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { this } + def withRecipient(ref: InternalActorRef): InboundEnvelope = { + _recipient = OptionVal(ref) + this + } + def clear(): Unit = { - _recipient = null + _recipient = OptionVal.None _recipientAddress = null _message = null - _senderOption = OptionVal.None + _sender = OptionVal.None _originUid = 0L _association = OptionVal.None } def init( - recipient: InternalActorRef, + recipient: OptionVal[InternalActorRef], recipientAddress: Address, message: AnyRef, - senderOption: OptionVal[ActorRef], + sender: OptionVal[ActorRef], originUid: Long, association: OptionVal[OutboundContext]): Unit = { _recipient = recipient _recipientAddress = recipientAddress _message = message - _senderOption = senderOption + _sender = sender _originUid = originUid _association = association } override def toString: String = - s"InboundEnvelope($recipient, $recipientAddress, $message, $senderOption, $originUid, $association)" + s"InboundEnvelope($recipient, $recipientAddress, $message, $sender, $originUid, $association)" } /** @@ -647,7 +654,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def sendControl(to: Address, message: ControlMessage) = association(to).sendControl(message) - override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { + override def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation val a = @@ -658,7 +665,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R a2 } - a.send(message, senderOption, recipient) + a.send(message, sender, recipient) } override def association(remoteAddress: Address): Association = @@ -719,7 +726,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R flightRecorder.createEventSink())) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ - messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) + messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) inboundEnvelopePool.release(m) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index ca90c9a366..c6d27f4c85 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -37,6 +37,7 @@ import akka.util.{ Unsafe, WildcardTree } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.util.OptionVal import akka.remote.QuarantinedEvent +import akka.remote.DaemonMsgCreate /** * INTERNAL API @@ -159,20 +160,31 @@ private[remote] class Association( override def sendControl(message: ControlMessage): Unit = outboundControlIngress.sendControlMessage(message) - def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { + def send(message: Any, sender: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage | ClearSystemMessageDelivery ⇒ - val send = Send(message, senderOption, recipient, None) + val send = Send(message, sender, recipient, None) if (!controlQueue.offer(send)) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") transport.system.deadLetters ! send } + case _: DaemonMsgCreate ⇒ + // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because + // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages. + // It must also be sent over the ordinary message stream so that it arrives (and creates the + // destination) before the first ordinary message arrives. + val send1 = Send(message, sender, recipient, None) + if (!controlQueue.offer(send1)) + transport.system.deadLetters ! send1 + val send2 = Send(message, sender, recipient, None) + if (!queue.offer(send2)) + transport.system.deadLetters ! send2 case _ ⇒ - val send = Send(message, senderOption, recipient, None) + val send = Send(message, sender, recipient, None) val offerOk = if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) largeQueue.offer(send) diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 8842d548b6..b649fd71ae 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -99,6 +99,9 @@ sealed trait HeaderBuilder { def setNoSender(): Unit def isNoSender: Boolean + def setNoRecipient(): Unit + def isNoRecipient: Boolean + def recipientActorRef_=(ref: String): Unit def recipientActorRef: String @@ -147,6 +150,14 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr } } + def setNoRecipient(): Unit = { + _recipientActorRef = null + _recipientActorRefIdx = EnvelopeBuffer.DeadLettersCode + } + + def isNoRecipient: Boolean = + (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode + def recipientActorRef_=(ref: String): Unit = { _recipientActorRef = ref _recipientActorRefIdx = compressionTable.compressActorRef(ref) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index db77bc75ae..614a3f92fa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,5 +1,6 @@ package akka.remote.artery +import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor.{ ActorRef, InternalActorRef } import akka.actor.ActorSystem @@ -11,9 +12,13 @@ import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.OptionVal +import akka.actor.EmptyLocalActorRef +import akka.stream.stage.TimerGraphStageLogic -// TODO: Long UID -class Encoder( +/** + * INTERNAL API + */ +private[remote] class Encoder( uniqueLocalAddress: UniqueAddress, system: ActorSystem, compressionTable: LiteralCompressionTable, @@ -104,7 +109,20 @@ class Encoder( } } -class Decoder( +/** + * INTERNAL API + */ +private[remote] object Decoder { + private final case class RetryResolveRemoteDeployedRecipient( + attemptsLeft: Int, + recipientPath: String, + inboundEnvelope: InboundEnvelope) +} + +/** + * INTERNAL API + */ +private[remote] class Decoder( inboundContext: InboundContext, system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, @@ -116,7 +134,8 @@ class Decoder( val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address private val headerBuilder = HeaderBuilder(compressionTable) private val serialization = SerializationExtension(system) @@ -124,6 +143,9 @@ class Decoder( private val recipientCache = new java.util.HashMap[String, InternalActorRef] private val senderCache = new java.util.HashMap[String, ActorRef] + private val retryResolveRemoteDeployedRecipientInterval = 50.millis + private val retryResolveRemoteDeployedRecipientAttempts = 20 + override protected def logSource = classOf[Decoder] override def onPush(): Unit = { @@ -133,18 +155,8 @@ class Decoder( // FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances // in case of compression is enabled // FIXME: Is localAddress really needed? - val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match { - case null ⇒ - val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef) - // FIXME we might need an efficient LRU cache, or replaced by compression table - if (recipientCache.size() >= 1000) - recipientCache.clear() - recipientCache.put(headerBuilder.recipientActorRef, ref) - ref - case ref ⇒ ref - } - val senderOption = + val sender = if (headerBuilder.isNoSender) OptionVal.None else { @@ -160,6 +172,12 @@ class Decoder( } } + val recipient = + if (headerBuilder.isNoRecipient) + OptionVal.None + else + resolveRecipient(headerBuilder.recipientActorRef) + val originUid = headerBuilder.uid val association = inboundContext.association(originUid) @@ -172,11 +190,18 @@ class Decoder( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, - senderOption, + sender, originUid, association) - push(out, decoded) + if (recipient.isEmpty && !headerBuilder.isNoRecipient) { + // the remote deployed actor might not be created yet when resolving the + // recipient for the first message that is sent to it, best effort retry + scheduleOnce(RetryResolveRemoteDeployedRecipient( + retryResolveRemoteDeployedRecipientAttempts, + headerBuilder.recipientActorRef, decoded), retryResolveRemoteDeployedRecipientInterval) + } else + push(out, decoded) } catch { case NonFatal(e) ⇒ log.warning( @@ -188,8 +213,56 @@ class Decoder( } } + private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { + recipientCache.get(path) match { + case null ⇒ + def addToCache(resolved: InternalActorRef): Unit = { + // FIXME we might need an efficient LRU cache, or replaced by compression table + if (recipientCache.size() >= 1000) + recipientCache.clear() + recipientCache.put(path, resolved) + } + + resolveActorRefWithLocalAddress(path) match { + case empty: EmptyLocalActorRef ⇒ + val pathElements = empty.path.elements + if (pathElements.nonEmpty && pathElements.head == "remote") + OptionVal.None + else { + addToCache(empty) + OptionVal(empty) + } + case ref ⇒ + addToCache(ref) + OptionVal(ref) + } + case ref ⇒ OptionVal(ref) + } + } + override def onPull(): Unit = pull(in) + override protected def onTimer(timerKey: Any): Unit = { + timerKey match { + case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒ + resolveRecipient(recipientPath) match { + case OptionVal.None ⇒ + if (attemptsLeft > 0) + scheduleOnce(RetryResolveRemoteDeployedRecipient( + attemptsLeft - 1, + headerBuilder.recipientActorRef, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) + else { + val recipient = resolveActorRefWithLocalAddress(recipientPath) + // only retry for the first message + recipientCache.put(recipientPath, recipient) + push(out, inboundEnvelope.withRecipient(recipient)) + } + case OptionVal.Some(recipient) ⇒ + push(out, inboundEnvelope.withRecipient(recipient)) + } + } + } + setHandlers(in, out, this) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 5afbdcec07..203169817c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -39,7 +39,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { "be emitted via side channel" in { val observerProbe = TestProbe() val inboundContext = new TestInboundContext(localAddress = addressB) - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, OptionVal.None)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index fb369d4bd4..ac7b62529b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -39,7 +39,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2) private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used TestSource.probe[AnyRef] .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, inboundContext.association(addressA.uid))) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala index c314cd5e0a..004547b081 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala @@ -59,8 +59,7 @@ class RemoteDeploymentSpec extends AkkaSpec(""" "Remoting" must { - // FIXME this test is failing with Artery - "create and supervise children on remote node" ignore { + "create and supervise children on remote node" in { val senderProbe = TestProbe()(masterSystem) val r = masterSystem.actorOf(Props[Echo1], "blub") r.path.toString should ===(s"artery://${system.name}@localhost:${port}/remote/artery/${masterSystem.name}@localhost:${masterPort}/user/blub") diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index 6ce9ba3004..4d3380d03d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -22,7 +22,6 @@ object RemoteRouterSpec { } class RemoteRouterSpec extends AkkaSpec(""" - akka.loglevel=DEBUG akka.actor.provider = remote akka.remote.artery.enabled = on akka.remote.artery.hostname = localhost @@ -99,8 +98,7 @@ class RemoteRouterSpec extends AkkaSpec(""" "A Remote Router" must { - // FIXME this test is failing with Artery - "deploy its children on remote host driven by configuration" ignore { + "deploy its children on remote host driven by configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps), "blub") val replies = collectRouteePaths(probe, router, 5) @@ -112,8 +110,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy its children on remote host driven by programatic definition" ignore { + "deploy its children on remote host driven by programatic definition" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(new RemoteRouterConfig( RoundRobinPool(2), @@ -126,8 +123,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy dynamic resizable number of children on remote host driven by configuration" ignore { + "deploy dynamic resizable number of children on remote host driven by configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(FromConfig.props(echoActorProps), "elastic-blub") val replies = collectRouteePaths(probe, router, 5000) @@ -152,8 +148,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "deploy remote routers based on explicit deployment" ignore { + "deploy remote routers based on explicit deployment" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps) .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "remote-blub2") @@ -168,8 +163,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "let remote deployment be overridden by local configuration" ignore { + "let remote deployment be overridden by local configuration" in { val probe = TestProbe()(masterSystem) val router = masterSystem.actorOf(RoundRobinPool(2).props(echoActorProps) .withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"artery://${sysName}@localhost:${port}")))), "local-blub") @@ -214,8 +208,7 @@ class RemoteRouterSpec extends AkkaSpec(""" masterSystem.stop(router) } - // FIXME this test is failing with Artery - "set supplied supervisorStrategy" ignore { + "set supplied supervisorStrategy" in { val probe = TestProbe()(masterSystem) val escalator = OneForOneStrategy() { case e ⇒ probe.ref ! e; SupervisorStrategy.Escalate diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index b964a1c741..36b8340a4b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -73,7 +73,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi } private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = { - val recipient = null.asInstanceOf[InternalActorRef] // not used + val recipient = OptionVal.None // not used Flow[Send] .map { case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 94763cda5c..34371d7673 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -85,7 +85,7 @@ private[akka] class TestOutboundContext( override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) - controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, OptionVal.None, localAddress.uid, + controlSubject.sendControl(InboundEnvelope(OptionVal.None, remoteAddress, message, OptionVal.None, localAddress.uid, OptionVal.None)) }