diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 987f819b88..490b2c85a8 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -342,10 +342,17 @@ private[akka] class RemoteActorRefProvider( override private[akka] def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def rootGuardianAt(address: Address): ActorRef = + def rootGuardianAt(address: Address): ActorRef = { if (hasAddress(address)) rootGuardian - else new RemoteActorRef(transport, transport.localAddressForRemote(address), - RootActorPath(address), Nobody, props = None, deploy = None) + else try { + new RemoteActorRef(transport, transport.localAddressForRemote(address), + RootActorPath(address), Nobody, props = None, deploy = None) + } catch { + case NonFatal(e) ⇒ + log.error(e, "No root guardian at [{}]", address) + new EmptyLocalActorRef(this, RootActorPath(address), eventStream) + } + } /** * INTERNAL API @@ -355,9 +362,14 @@ private[akka] class RemoteActorRefProvider( path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) - else + else try { new RemoteActorRef(transport, localAddress, RootActorPath(address) / elems, Nobody, props = None, deploy = None) + } catch { + case NonFatal(e) ⇒ + log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) + new EmptyLocalActorRef(this, RootActorPath(address) / elems, eventStream) + } case _ ⇒ log.debug("resolve of unknown path [{}] failed", path) deadLetters @@ -374,7 +386,7 @@ private[akka] class RemoteActorRefProvider( rootPath, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ - log.warning("Error while resolving address [{}] due to [{}]", rootPath.address, e.getMessage) + log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, rootPath, eventStream) } } @@ -390,7 +402,7 @@ private[akka] class RemoteActorRefProvider( path, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ - log.error(e, "Error while resolving address [{}]", path.address) + log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, path, eventStream) } } @@ -459,10 +471,19 @@ private[akka] class RemoteActorRef private[akka] ( deploy: Option[Deploy]) extends InternalActorRef with RemoteRef { - @volatile var cachedAssociation: artery.Association = null + remote match { + case t: ArteryTransport ⇒ + // detect mistakes such as using "akka.tcp" with Artery + if (path.address.protocol != t.localAddress.address.protocol) + throw new IllegalArgumentException( + s"Wrong protocol of [${path}], expected [${t.localAddress.address.protocol}]") + case _ ⇒ + } + + @volatile private[remote] var cachedAssociation: artery.Association = null // used by artery to direct messages to a separate stream for large messages - @volatile var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null + @volatile private[remote] var cachedLargeMessageDestinationFlag: LargeMessageDestinationFlag = null def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index f846360d6d..62bf1e9ecf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -216,7 +216,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // these vars are initialized once in the start method @volatile private[this] var _localAddress: UniqueAddress = _ - override def localAddress: UniqueAddress = _localAddress + @volatile private[this] var _addresses: Set[Address] = _ @volatile private[this] var materializer: Materializer = _ @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ @@ -224,8 +224,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ + override def localAddress: UniqueAddress = _localAddress override def defaultAddress: Address = localAddress.address - override def addresses: Set[Address] = Set(defaultAddress) + override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress override val log: LoggingAdapter = Logging(system, getClass.getName) private val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel) @@ -284,8 +285,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO: Configure materializer properly // TODO: Have a supervisor actor _localAddress = UniqueAddress( - Address("artery", system.name, remoteSettings.ArteryHostname, port), + Address(ArteryTransport.ProtocolName, system.name, remoteSettings.ArteryHostname, port), AddressUidExtension(system).longAddressUid) + _addresses = Set(_localAddress.address) val materializerSettings = ActorMaterializerSettings( remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer")) @@ -465,11 +467,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation - val remoteAddress = recipient.path.address val a = if (cached ne null) cached - else association(remoteAddress) + else { + val a2 = association(recipient.path.address) + recipient.cachedAssociation = a2 + a2 + } a.send(message, senderOption, recipient) } @@ -574,6 +579,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R */ private[remote] object ArteryTransport { + val ProtocolName = "artery" + val Version = 0 val MaximumFrameSize = 1024 * 1024 val MaximumPooledBuffers = 256 diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index dbd188d377..ea7435cff5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -560,7 +560,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test")) val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo") val proxySsl = system.actorOf(Props(classOf[Proxy], remoteEchoHereSsl, testActor), "proxy-ssl") - EventFilter.warning(start = "Error while resolving address", occurrences = 1).intercept { + EventFilter.warning(start = "Error while resolving ActorRef", occurrences = 1).intercept { proxySsl ! otherGuy expectMsg(3.seconds, ("pong", otherGuyRemoteTest)) }(otherSystem) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala new file mode 100644 index 0000000000..3d0614dc79 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ +import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.testkit.TestActors +import com.typesafe.config.ConfigFactory +import akka.testkit.EventFilter +import akka.actor.InternalActorRef +import akka.remote.RemoteActorRef +import akka.actor.EmptyLocalActorRef + +object RemoteActorRefProviderSpec { + + val config = ConfigFactory.parseString(s""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.hostname = localhost + remote.artery.port = 0 + } + """) + +} + +class RemoteActorRefProviderSpec extends AkkaSpec(RemoteActorRefProviderSpec.config) with ImplicitSender { + import RemoteActorRefProviderSpec._ + + val addressA = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + system.actorOf(TestActors.echoActorProps, "echo") + + val systemB = ActorSystem("systemB", system.settings.config) + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + systemB.actorOf(TestActors.echoActorProps, "echo") + + override def afterTermination(): Unit = shutdown(systemB) + + "RemoteActorRefProvider" must { + + "resolve local actor selection" in { + val sel = system.actorSelection(s"artery://${system.name}@${addressA.host.get}:${addressA.port.get}/user/echo") + sel.anchor.asInstanceOf[InternalActorRef].isLocal should be(true) + } + + "resolve remote actor selection" in { + val sel = system.actorSelection(s"artery://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo") + sel.anchor.getClass should ===(classOf[RemoteActorRef]) + sel.anchor.asInstanceOf[InternalActorRef].isLocal should be(false) + } + + "detect wrong protocol" in { + EventFilter[IllegalArgumentException](start = "No root guardian at", occurrences = 1).intercept { + val sel = system.actorSelection(s"akka.tcp://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo") + sel.anchor.getClass should ===(classOf[EmptyLocalActorRef]) + } + } + + } + +}