diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 15f190bbfd..bd9afb83a2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -168,11 +168,11 @@ object ActorSystem { final val ConfigVersion: String = getString("akka.version") final val ProviderClass: String = getString("akka.actor.provider") match { - case "local" => classOf[LocalActorRefProvider].getName + case "local" ⇒ classOf[LocalActorRefProvider].getName // these two cannot be referenced by class as they may not be on the classpath - case "remote" => "akka.remote.RemoteActorRefProvider" - case "cluster" => "akka.cluster.ClusterActorRefProvider" - case fqcn => fqcn + case "remote" ⇒ "akka.remote.RemoteActorRefProvider" + case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider" + case fqcn ⇒ fqcn } final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy") diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 614a3f92fa..0104daa779 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -5,7 +5,7 @@ import scala.util.control.NonFatal import akka.actor.{ ActorRef, InternalActorRef } import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem -import akka.remote.{ MessageSerializer, UniqueAddress } +import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress } import akka.remote.EndpointManager.Send import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } @@ -95,6 +95,10 @@ private[remote] class Encoder( case _: SystemMessageEnvelope ⇒ log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName) throw e + case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒ + val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${send.recipient}: max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${send.message.getClass.getName}].") + log.error(reason, "Transient association error (association remains live)") + pull(in) case _ ⇒ log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName) pull(in) diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala new file mode 100644 index 0000000000..4640772ec8 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ ActorSystem, ExtendedActorSystem, RootActorPath } +import akka.remote.RARP +import akka.testkit.AkkaSpec +import com.typesafe.config.{ Config, ConfigFactory } + +object ArteryMultiNodeSpec { + def defaultConfig = + ConfigFactory.parseString(""" + akka { + actor.provider = remote + actor.warn-about-java-serializer-usage = off + remote.artery { + enabled = on + hostname = localhost + port = 0 + } + } + """) +} + +/** + * Base class for remoting tests what needs to test interaction between a "local" actor system + * which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery + */ +abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArteryMultiNodeSpec.defaultConfig)) { + + def this() = this(ConfigFactory.empty()) + def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig)) + + /** just an alias to make tests more readable */ + def localSystem = system + def localPort = port(localSystem) + def port(system: ActorSystem): Int = RARP(system).provider.getDefaultAddress.port.get + def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress + def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys)) + def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}" + + private var remoteSystems: Vector[ActorSystem] = Vector.empty + + /** + * @return A new actor system configured with artery enabled. The system will + * automatically be terminated after test is completed to avoid leaks. + */ + def newRemoteSystem(extraConfig: Option[String] = None, name: Option[String] = None): ActorSystem = { + val config = + extraConfig.fold( + localSystem.settings.config + )( + str ⇒ ConfigFactory.parseString(str).withFallback(localSystem.settings.config) + ) + + val remoteSystem = ActorSystem(name.getOrElse(nextGeneratedSystemName), config) + remoteSystems = remoteSystems :+ remoteSystem + + remoteSystem + } + + override def afterTermination(): Unit = { + remoteSystems.foreach(sys ⇒ shutdown(sys)) + remoteSystems = Vector.empty + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala index d52e28da67..1eaaa5d103 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala @@ -3,13 +3,10 @@ */ package akka.remote.artery -import akka.actor.{ Actor, ActorRef, ActorSelection, ActorSystem, ExtendedActorSystem, Props, RootActorPath } -import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef } -import akka.testkit.{ SocketUtil, TestKit, TestProbe } +import akka.actor.{ Actor, ActorRef, ActorSelection, Props, RootActorPath } +import akka.remote.{ LargeDestination, RARP, RegularDestination, RemoteActorRef } +import akka.testkit.TestProbe import akka.util.ByteString -import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{ ShouldMatchers, WordSpec } import scala.concurrent.Await import scala.concurrent.duration._ @@ -24,130 +21,99 @@ object LargeMessagesStreamSpec { } } -class LargeMessagesStreamSpec extends WordSpec with ShouldMatchers with ScalaFutures { +class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( + """ + akka { + loglevel = ERROR + remote.artery.large-message-destinations = [ "/user/large" ] + } + """.stripMargin) { + import LargeMessagesStreamSpec._ - val config = ConfigFactory.parseString( - s""" - akka { - loglevel = ERROR - actor { - provider = remote - } - remote.artery { - enabled = on - hostname = localhost - port = 0 - large-message-destinations = [ - "/user/large" - ] - } - } - - """) - "The large message support" should { "not affect regular communication" in { - val systemA = ActorSystem("systemA", config) - val systemB = ActorSystem("systemB", config) + val systemA = localSystem + val systemB = newRemoteSystem() - try { - val senderProbeA = TestProbe()(systemA) - val senderProbeB = TestProbe()(systemB) + val senderProbeA = TestProbe()(systemA) + val senderProbeB = TestProbe()(systemB) - // start actor and make sure it is up and running - val regular = systemB.actorOf(Props(new EchoSize), "regular") - regular.tell(Ping(), senderProbeB.ref) - senderProbeB.expectMsg(Pong(0)) + // start actor and make sure it is up and running + val regular = systemB.actorOf(Props(new EchoSize), "regular") + regular.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) - // communicate with it from the other system - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val rootB = RootActorPath(addressB) - val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) - regularRemote.tell(Ping(), senderProbeA.ref) - senderProbeA.expectMsg(Pong(0)) + // communicate with it from the other system + val regularRemote = awaitResolve(systemA.actorSelection(rootActorPath(systemB) / "user" / "regular")) + regularRemote.tell(Ping(), senderProbeA.ref) + senderProbeA.expectMsg(Pong(0)) - // flag should be cached now - regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) + // flag should be cached now + regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) - } finally { - TestKit.shutdownActorSystem(systemA) - TestKit.shutdownActorSystem(systemB) - } } "pass small regular messages over the large-message stream" in { - val systemA = ActorSystem("systemA", config) - val systemB = ActorSystem("systemB", config) + val systemA = localSystem + val systemB = newRemoteSystem() - try { - val senderProbeA = TestProbe()(systemA) - val senderProbeB = TestProbe()(systemB) + val senderProbeA = TestProbe()(systemA) + val senderProbeB = TestProbe()(systemB) - // start actor and make sure it is up and running - val large = systemB.actorOf(Props(new EchoSize), "large") - large.tell(Ping(), senderProbeB.ref) - senderProbeB.expectMsg(Pong(0)) + // start actor and make sure it is up and running + val large = systemB.actorOf(Props(new EchoSize), "large") + large.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) - // communicate with it from the other system - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val rootB = RootActorPath(addressB) - val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) - largeRemote.tell(Ping(), senderProbeA.ref) - senderProbeA.expectMsg(Pong(0)) + // communicate with it from the other system + val addressB = RARP(systemB).provider.getDefaultAddress + val rootB = RootActorPath(addressB) + val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) + largeRemote.tell(Ping(), senderProbeA.ref) + senderProbeA.expectMsg(Pong(0)) - // flag should be cached now - largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) + // flag should be cached now + largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) - } finally { - TestKit.shutdownActorSystem(systemA) - TestKit.shutdownActorSystem(systemB) - } } "allow for normal communication while simultaneously sending large messages" in { - val systemA = ActorSystem("systemA", config) - val systemB = ActorSystem("systemB", config) + val systemA = localSystem + val systemB = newRemoteSystem() - try { + val senderProbeB = TestProbe()(systemB) - val senderProbeB = TestProbe()(systemB) + // setup two actors, one with the large flag and one regular + val large = systemB.actorOf(Props(new EchoSize), "large") + large.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) - // setup two actors, one with the large flag and one regular - val large = systemB.actorOf(Props(new EchoSize), "large") - large.tell(Ping(), senderProbeB.ref) - senderProbeB.expectMsg(Pong(0)) + val regular = systemB.actorOf(Props(new EchoSize), "regular") + regular.tell(Ping(), senderProbeB.ref) + senderProbeB.expectMsg(Pong(0)) - val regular = systemB.actorOf(Props(new EchoSize), "regular") - regular.tell(Ping(), senderProbeB.ref) - senderProbeB.expectMsg(Pong(0)) + // both up and running, resolve remote refs + val addressB = RARP(systemB).provider.getDefaultAddress + val rootB = RootActorPath(addressB) + val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) + val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) - // both up and running, resolve remote refs - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val rootB = RootActorPath(addressB) - val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) - val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) + // send a large message, as well as regular one + val remoteProbe = TestProbe()(systemA) - // send a large message, as well as regular one - val remoteProbe = TestProbe()(systemA) + val largeBytes = 2000000 + largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref) + regularRemote.tell(Ping(), remoteProbe.ref) - val largeBytes = 2000000 - largeRemote.tell(Ping(ByteString.fromArray(Array.ofDim[Byte](largeBytes))), remoteProbe.ref) - regularRemote.tell(Ping(), remoteProbe.ref) + // should be no problems sending regular small messages while large messages are being sent + remoteProbe.expectMsg(Pong(0)) + remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) - // should be no problems sending regular small messages while large messages are being sent - remoteProbe.expectMsg(Pong(0)) - remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) - - // cached flags should be set now - largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) - regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) - - } finally { - TestKit.shutdownActorSystem(systemA) - TestKit.shutdownActorSystem(systemB) - } + // cached flags should be set now + largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(LargeDestination) + regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala new file mode 100644 index 0000000000..80536c106e --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ Actor, ActorRef, ActorRefScope, PoisonPill, Props } +import akka.pattern.ask +import akka.remote.RemoteActorRef +import akka.remote.RemotingSpec.ActorForReq +import akka.testkit.{ EventFilter, _ } +import akka.util.Timeout + +import scala.concurrent.duration._ + +object RemoteActorForSpec { + final case class ActorForReq(s: String) +} + +class RemoteActorForSpec extends ArteryMultiNodeSpec("akka.loglevel=INFO") with ImplicitSender with DefaultTimeout { + + val remoteSystem = newRemoteSystem() + val remotePort = port(remoteSystem) + + "Remote lookups" should { + + "support remote look-ups" in { + remoteSystem.actorOf(TestActors.echoActorProps, "remote-look-ups") + val remoteRef = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/remote-look-ups") + remoteRef ! "ping" + expectMsg("ping") + } + + // FIXME does not log anything currently + "send warning message for wrong address" ignore { + filterEvents(EventFilter.warning(pattern = "Address is now gated for ", occurrences = 1)) { + localSystem.actorFor("artery://nonexistingsystem@localhost:12346/user/echo") ! "ping" + } + } + + "support ask" in { + remoteSystem.actorOf(TestActors.echoActorProps, "support-ask") + val remoteRef = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/support-ask") + + implicit val timeout: Timeout = 10.seconds + (remoteRef ? "ping").futureValue should ===("ping") + } + + "send dead letters on remote if actor does not exist" in { + EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { + localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/dead-letters-on-remote") ! "buh" + }(remoteSystem) + } + + // FIXME can't communicate with new ref looked up after starting a new instance (!?!) + "not send to remote re-created actor with same name" ignore { + + def lookItUp() = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/re-created") + + val echo1 = remoteSystem.actorOf(TestActors.echoActorProps, "re-created") + val remoteRef1 = lookItUp() + remoteRef1 ! 2 + expectMsg(2) + + // now stop and start a new actor with the same name + watch(echo1) + remoteSystem.stop(echo1) + expectTerminated(echo1) + + val echo2 = remoteSystem.actorOf(TestActors.echoActorProps, "re-created") + val remoteRef2 = lookItUp() + remoteRef2 ! 2 + expectMsg(2) + + // the old ref should not interact with the + // new actor instance at the same path + remoteRef1 ! 3 + expectNoMsg(1.second) + + // and additionally, but it would have failed already + // if this wasn't true + remoteRef1.path.uid should !==(remoteRef2.path.uid) + } + + // FIXME needs remote deployment section + "look-up actors across node boundaries" ignore { + val l = localSystem.actorOf(Props(new Actor { + def receive = { + case (p: Props, n: String) ⇒ sender() ! context.actorOf(p, n) + case ActorForReq(s) ⇒ sender() ! context.actorFor(s) + } + }), "looker1") + // child is configured to be deployed on remote-sys (remoteSystem) + l ! ((TestActors.echoActorProps, "child")) + val child = expectMsgType[ActorRef] + // grandchild is configured to be deployed on RemotingSpec (system) + child ! ((TestActors.echoActorProps, "grandchild")) + val grandchild = expectMsgType[ActorRef] + grandchild.asInstanceOf[ActorRefScope].isLocal should ===(true) + grandchild ! 43 + expectMsg(43) + val myref = localSystem.actorFor(system / "looker1" / "child" / "grandchild") + myref.isInstanceOf[RemoteActorRef] should ===(true) + myref ! 44 + expectMsg(44) + lastSender should ===(grandchild) + lastSender should be theSameInstanceAs grandchild + child.asInstanceOf[RemoteActorRef].getParent should ===(l) + localSystem.actorFor("/user/looker1/child") should be theSameInstanceAs child + (l ? ActorForReq("child/..")).mapTo[AnyRef].futureValue should be theSameInstanceAs l + (localSystem.actorFor(system / "looker1" / "child") ? ActorForReq("..")).mapTo[AnyRef].futureValue should be theSameInstanceAs l + + watch(child) + child ! PoisonPill + expectMsg("postStop") + expectTerminated(child) + l ! ((TestActors.echoActorProps, "child")) + val child2 = expectMsgType[ActorRef] + child2 ! 45 + expectMsg(45) + // msg to old ActorRef (different uid) should not get through + child2.path.uid should not be (child.path.uid) + child ! 46 + expectNoMsg(1.second) + system.actorFor(system / "looker1" / "child") ! 47 + expectMsg(47) + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala index fcfe56ba6f..5410f0e0d2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala @@ -3,41 +3,19 @@ */ package akka.remote.artery -import scala.concurrent.duration._ -import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath } -import akka.testkit.{ AkkaSpec, ImplicitSender } -import akka.testkit.TestActors -import com.typesafe.config.ConfigFactory -import akka.testkit.EventFilter -import akka.actor.InternalActorRef +import akka.actor.{ EmptyLocalActorRef, InternalActorRef } import akka.remote.RemoteActorRef -import akka.actor.EmptyLocalActorRef +import akka.testkit.{ EventFilter, TestActors } -object RemoteActorRefProviderSpec { +class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec { - val config = ConfigFactory.parseString(s""" - akka { - actor.provider = remote - remote.artery.enabled = on - remote.artery.hostname = localhost - remote.artery.port = 0 - } - """) - -} - -class RemoteActorRefProviderSpec extends AkkaSpec(RemoteActorRefProviderSpec.config) with ImplicitSender { - import RemoteActorRefProviderSpec._ - - val addressA = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val addressA = address(localSystem) system.actorOf(TestActors.echoActorProps, "echo") - val systemB = ActorSystem("systemB", system.settings.config) - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val systemB = newRemoteSystem() + val addressB = address(systemB) systemB.actorOf(TestActors.echoActorProps, "echo") - override def afterTermination(): Unit = shutdown(systemB) - "RemoteActorRefProvider" must { "resolve local actor selection" in { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorSelectionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorSelectionSpec.scala new file mode 100644 index 0000000000..e5ea82fd13 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorSelectionSpec.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorRefScope, ActorSelection, ActorSystem, ExtendedActorSystem, Identify, PoisonPill, Props, Terminated } +import akka.remote.RARP +import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestActors } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object RemoteActorSelectionSpec { + final case class ActorSelReq(s: String) + + class SelectionActor extends Actor { + def receive = { + // if we get props and a name, create a child, send ref back + case (p: Props, n: String) ⇒ sender() ! context.actorOf(p, n) + // or select actor from here + case ActorSelReq(s) ⇒ sender() ! context.actorSelection(s) + } + } + def selectionActorProps = Props(new SelectionActor) +} + +class RemoteActorSelectionSpec extends ArteryMultiNodeSpec with ImplicitSender { + + import RemoteActorSelectionSpec._ + + val systemB = system + + val systemA = { + val remotePort = port(systemB) + val remoteSysName = systemB.name + + val localSysName = "local-" + remoteSysName + val localPort = SocketUtil.temporaryServerAddress(udp = true).getPort + + // nesting the hierarchy across the two systems + newRemoteSystem(Some(s""" + akka { + remote.artery.port = $localPort + actor.deployment { + /looker2/child.remote = "artery://$remoteSysName@localhost:$remotePort" + /looker2/child/grandchild.remote = "artery://$localSysName@localhost:$localPort" + } + } + """)) + } + + "Remote actor selection" should { + + // TODO would like to split up in smaller cases but find it hard + // TODO fails with "received Supervise from unregistered child" when looker2/child is created - akka/akka#20715 + "select actors across node boundaries" ignore { + + val localLooker2 = systemA.actorOf(selectionActorProps, "looker2") + + // child is configured to be deployed on remoteSystem + localLooker2 ! ((selectionActorProps, "child")) + val remoteChild = expectMsgType[ActorRef] + + // grandchild is configured to be deployed on local system but from remote system + remoteChild ! ((selectionActorProps, "grandchild")) + val localGrandchild = expectMsgType[ActorRef] + localGrandchild.asInstanceOf[ActorRefScope].isLocal should ===(true) + localGrandchild ! 53 + expectMsg(53) + + val localGrandchildSelection = systemA.actorSelection(system / "looker2" / "child" / "grandchild") + localGrandchildSelection ! 54 + expectMsg(54) + lastSender should ===(localGrandchild) + lastSender should be theSameInstanceAs localGrandchild + localGrandchildSelection ! Identify(localGrandchildSelection) + val grandchild2 = expectMsgType[ActorIdentity].ref + grandchild2 should ===(Some(localGrandchild)) + + systemA.actorSelection("/user/looker2/child") ! Identify(None) + expectMsgType[ActorIdentity].ref should ===(Some(remoteChild)) + + localLooker2 ! ActorSelReq("child/..") + expectMsgType[ActorSelection] ! Identify(None) + expectMsgType[ActorIdentity].ref.get should be theSameInstanceAs localLooker2 + + system.actorSelection(system / "looker2" / "child") ! ActorSelReq("..") + expectMsgType[ActorSelection] ! Identify(None) + expectMsgType[ActorIdentity].ref.get should be theSameInstanceAs localLooker2 + + localGrandchild ! ((TestActors.echoActorProps, "grandgrandchild")) + val grandgrandchild = expectMsgType[ActorRef] + + system.actorSelection("/user/looker2/child") ! Identify("idReq1") + expectMsg(ActorIdentity("idReq1", Some(remoteChild))) + system.actorSelection(remoteChild.path) ! Identify("idReq2") + expectMsg(ActorIdentity("idReq2", Some(remoteChild))) + system.actorSelection("/user/looker2/*") ! Identify("idReq3") + expectMsg(ActorIdentity("idReq3", Some(remoteChild))) + + system.actorSelection("/user/looker2/child/grandchild") ! Identify("idReq4") + expectMsg(ActorIdentity("idReq4", Some(localGrandchild))) + system.actorSelection(remoteChild.path / "grandchild") ! Identify("idReq5") + expectMsg(ActorIdentity("idReq5", Some(localGrandchild))) + system.actorSelection("/user/looker2/*/grandchild") ! Identify("idReq6") + expectMsg(ActorIdentity("idReq6", Some(localGrandchild))) + system.actorSelection("/user/looker2/child/*") ! Identify("idReq7") + expectMsg(ActorIdentity("idReq7", Some(localGrandchild))) + system.actorSelection(remoteChild.path / "*") ! Identify("idReq8") + expectMsg(ActorIdentity("idReq8", Some(localGrandchild))) + + system.actorSelection("/user/looker2/child/grandchild/grandgrandchild") ! Identify("idReq9") + expectMsg(ActorIdentity("idReq9", Some(grandgrandchild))) + system.actorSelection(remoteChild.path / "grandchild" / "grandgrandchild") ! Identify("idReq10") + expectMsg(ActorIdentity("idReq10", Some(grandgrandchild))) + system.actorSelection("/user/looker2/child/*/grandgrandchild") ! Identify("idReq11") + expectMsg(ActorIdentity("idReq11", Some(grandgrandchild))) + system.actorSelection("/user/looker2/child/*/*") ! Identify("idReq12") + expectMsg(ActorIdentity("idReq12", Some(grandgrandchild))) + system.actorSelection(remoteChild.path / "*" / "grandgrandchild") ! Identify("idReq13") + expectMsg(ActorIdentity("idReq13", Some(grandgrandchild))) + + val sel1 = system.actorSelection("/user/looker2/child/grandchild/grandgrandchild") + system.actorSelection(sel1.toSerializationFormat) ! Identify("idReq18") + expectMsg(ActorIdentity("idReq18", Some(grandgrandchild))) + + remoteChild ! Identify("idReq14") + expectMsg(ActorIdentity("idReq14", Some(remoteChild))) + watch(remoteChild) + remoteChild ! PoisonPill + expectMsg("postStop") + expectMsgType[Terminated].actor should ===(remoteChild) + localLooker2 ! ((TestActors.echoActorProps, "child")) + val child2 = expectMsgType[ActorRef] + child2 ! Identify("idReq15") + expectMsg(ActorIdentity("idReq15", Some(child2))) + system.actorSelection(remoteChild.path) ! Identify("idReq16") + expectMsg(ActorIdentity("idReq16", Some(child2))) + remoteChild ! Identify("idReq17") + expectMsg(ActorIdentity("idReq17", None)) + + child2 ! 55 + expectMsg(55) + // msg to old ActorRef (different uid) should not get through + child2.path.uid should not be (remoteChild.path.uid) + remoteChild ! 56 + expectNoMsg(1.second) + system.actorSelection(system / "looker2" / "child") ! 57 + expectMsg(57) + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteConnectionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteConnectionSpec.scala new file mode 100644 index 0000000000..168e67ad0a --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteConnectionSpec.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.{ ActorSystem, ExtendedActorSystem } +import akka.remote.RARP +import akka.testkit.SocketUtil._ +import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestActors, TestEvent, TestProbe } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +class RemoteConnectionSpec extends ArteryMultiNodeSpec("akka.remote.retry-gate-closed-for = 5s") with ImplicitSender { + + def muteSystem(system: ActorSystem) { + system.eventStream.publish(TestEvent.Mute( + EventFilter.error(start = "AssociationError"), + EventFilter.warning(start = "AssociationError"), + EventFilter.warning(pattern = "received dead letter.*"))) + } + + "Remoting between systems" should { + + "be able to connect to system even if it's not there at first" in { + muteSystem(localSystem) + val localProbe = new TestProbe(localSystem) + + val remotePort = temporaryServerAddress(udp = true).getPort + + // try to talk to it before it is up + val selection = localSystem.actorSelection(s"artery://$nextGeneratedSystemName@localhost:$remotePort/user/echo") + selection.tell("ping", localProbe.ref) + localProbe.expectNoMsg(1.seconds) + + // then start the remote system and try again + val remoteSystem = newRemoteSystem(extraConfig = Some(s"akka.remote.artery.port=$remotePort")) + + muteSystem(remoteSystem) + localProbe.expectNoMsg(2.seconds) + remoteSystem.actorOf(TestActors.echoActorProps, "echo") + + within(5.seconds) { + awaitAssert { + selection.tell("ping", localProbe.ref) + localProbe.expectMsg(500.millis, "ping") + } + } + } + + "allow other system to connect even if it's not there at first" in { + val localSystem = newRemoteSystem() + + val localPort = port(localSystem) + muteSystem(localSystem) + + val localProbe = new TestProbe(localSystem) + localSystem.actorOf(TestActors.echoActorProps, "echo") + + val remotePort = temporaryServerAddress(udp = true).getPort + + // try to talk to remote before it is up + val selection = localSystem.actorSelection(s"artery://$nextGeneratedSystemName@localhost:$remotePort/user/echo") + selection.tell("ping", localProbe.ref) + localProbe.expectNoMsg(1.seconds) + + // then when it is up, talk from other system + val remoteSystem = newRemoteSystem(extraConfig = Some(s"akka.remote.artery.port=$remotePort")) + + muteSystem(remoteSystem) + localProbe.expectNoMsg(2.seconds) + val otherProbe = new TestProbe(remoteSystem) + val otherSender = otherProbe.ref + val thisSelection = remoteSystem.actorSelection(s"artery://${localSystem.name}@localhost:$localPort/user/echo") + within(5.seconds) { + awaitAssert { + thisSelection.tell("ping", otherSender) + otherProbe.expectMsg(500.millis, "ping") + } + } + } + } + +} + diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala index 004547b081..c4bbf74cb4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeploymentSpec.scala @@ -10,7 +10,7 @@ import akka.actor._ import akka.remote.routing._ import com.typesafe.config._ import akka.testkit.TestActors.echoActorProps -import akka.remote.RemoteScope +import akka.remote.{ RARP, RemoteScope } object RemoteDeploymentSpec { class Echo1 extends Actor { @@ -42,7 +42,7 @@ class RemoteDeploymentSpec extends AkkaSpec(""" import RemoteDeploymentSpec._ - val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val port = RARP(system).provider.getDefaultAddress.port.get val conf = ConfigFactory.parseString( s""" akka.actor.deployment { @@ -51,7 +51,7 @@ class RemoteDeploymentSpec extends AkkaSpec(""" """).withFallback(system.settings.config) val masterSystem = ActorSystem("Master" + system.name, conf) - val masterPort = masterSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val masterPort = RARP(masterSystem).provider.getDefaultAddress.port.get override def afterTermination(): Unit = { shutdown(masterSystem) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala new file mode 100644 index 0000000000..f719dfb6b8 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala @@ -0,0 +1,58 @@ +package akka.remote.artery + +import akka.remote.EndpointDisassociatedException +import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent } + +import scala.concurrent.duration._ + +class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender { + + "Remoting" should { + + "not be exhausted by sending to broken connections" in { + val remoteSystems = Vector.fill(5)(newRemoteSystem()) + + remoteSystems foreach { sys ⇒ + sys.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointDisassociatedException](), + EventFilter.warning(pattern = "received dead letter.*"))) + sys.actorOf(TestActors.echoActorProps, name = "echo") + } + val remoteSelections = remoteSystems map { sys ⇒ + system.actorSelection(rootActorPath(sys) / "user" / "echo") + } + + val echo = system.actorOf(TestActors.echoActorProps, name = "echo") + + val localSelection = system.actorSelection(rootActorPath(system) / "user" / "echo") + val n = 100 + + // first everything is up and running + 1 to n foreach { x ⇒ + localSelection ! "ping" + remoteSelections(x % remoteSystems.size) ! "ping" + } + + within(5.seconds) { + receiveN(n * 2) foreach { reply ⇒ reply should ===("ping") } + } + + // then we shutdown remote systems to simulate broken connections + remoteSystems foreach { sys ⇒ + shutdown(sys) + } + + 1 to n foreach { x ⇒ + localSelection ! "ping" + remoteSelections(x % remoteSystems.size) ! "ping" + } + + // ping messages to localEcho should go through even though we use many different broken connections + within(5.seconds) { + receiveN(n) foreach { reply ⇒ reply should ===("ping") } + } + + } + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala new file mode 100644 index 0000000000..d0ed9e5a93 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.io.NotSerializableException +import java.util.concurrent.ThreadLocalRandom + +import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, PoisonPill, Props } +import akka.remote.{ AssociationErrorEvent, DisassociatedEvent, OversizedPayloadException, RARP } +import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender, TestActors } +import akka.util.ByteString +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object RemoteMessageSerializationSpec { + class ProxyActor(val one: ActorRef, val another: ActorRef) extends Actor { + def receive = { + case s if sender().path == one.path ⇒ another ! s + case s if sender().path == another.path ⇒ one ! s + } + } + val maxPayloadBytes = ArteryTransport.MaximumFrameSize +} + +class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" + akka.actor.serialize-messages = off + akka.actor.serialize-creators = off + """) with ImplicitSender { + + import RemoteMessageSerializationSpec._ + + val remoteSystem = newRemoteSystem() + val remotePort = port(remoteSystem) + + "Remote message serialization" should { + + "drop unserializable messages" in { + object Unserializable + EventFilter[NotSerializableException](pattern = ".*No configured serialization.*", occurrences = 1).intercept { + verifySend(Unserializable) { + expectNoMsg(1.second) // No AssocitionErrorEvent should be published + } + } + } + + "allow messages up to payload size" in { + val maxProtocolOverhead = 500 // Make sure we're still under size after the message is serialized, etc + val big = byteStringOfSize(maxPayloadBytes - maxProtocolOverhead) + verifySend(big) { + expectMsg(3.seconds, big) + } + } + + "drop sent messages over payload size" in { + val oversized = byteStringOfSize(maxPayloadBytes + 1) + EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload sent.*", occurrences = 1).intercept { + verifySend(oversized) { + expectNoMsg(1.second) // No AssocitionErrorEvent should be published + } + } + } + + // TODO max payload size is not configurable yet, so we cannot send a too big message, it fails no sending side + "drop received messages over payload size" ignore { + // Receiver should reply with a message of size maxPayload + 1, which will be dropped and an error logged + EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload received.*", occurrences = 1).intercept { + verifySend(maxPayloadBytes + 1) { + expectNoMsg(1.second) // No AssocitionErrorEvent should be published + } + } + } + + "be able to serialize a local actor ref from another actor system" in { + remoteSystem.actorOf(TestActors.echoActorProps, "echo") + val local = localSystem.actorOf(TestActors.echoActorProps, "echo") + + val remoteEcho = system.actorSelection(rootActorPath(remoteSystem) / "user" / "echo").resolveOne(3.seconds).futureValue + remoteEcho ! local + expectMsg(3.seconds, local) + } + + } + + private def verifySend(msg: Any)(afterSend: ⇒ Unit) { + val bigBounceId = s"bigBounce-${ThreadLocalRandom.current.nextInt()}" + val bigBounceOther = remoteSystem.actorOf(Props(new Actor { + def receive = { + case x: Int ⇒ sender() ! byteStringOfSize(x) + case x ⇒ sender() ! x + } + }), bigBounceId) + val bigBounceHere = localSystem.actorFor(s"artery://${remoteSystem.name}@localhost:$remotePort/user/$bigBounceId") + + val eventForwarder = localSystem.actorOf(Props(new Actor { + def receive = { + case x ⇒ testActor ! x + } + })) + localSystem.eventStream.subscribe(eventForwarder, classOf[AssociationErrorEvent]) + localSystem.eventStream.subscribe(eventForwarder, classOf[DisassociatedEvent]) + try { + bigBounceHere ! msg + afterSend + expectNoMsg(500.millis) + } finally { + localSystem.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) + localSystem.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) + eventForwarder ! PoisonPill + bigBounceOther ! PoisonPill + } + } + + private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte)) + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index 4d3380d03d..1f724b3fea 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -10,7 +10,7 @@ import akka.actor._ import akka.remote.routing._ import com.typesafe.config._ import akka.testkit.TestActors.echoActorProps -import akka.remote.RemoteScope +import akka.remote.{ RARP, RemoteScope } object RemoteRouterSpec { class Parent extends Actor { @@ -43,7 +43,7 @@ class RemoteRouterSpec extends AkkaSpec(""" import RemoteRouterSpec._ - val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val port = RARP(system).provider.getDefaultAddress.port.get val sysName = system.name val conf = ConfigFactory.parseString( s""" diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 3abbbb7309..a800d86dbc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -8,6 +8,7 @@ import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSyst import akka.testkit.{ AkkaSpec, ImplicitSender } import com.typesafe.config.ConfigFactory import akka.actor.Actor.Receive +import akka.remote.RARP object RemoteSendConsistencySpec { @@ -25,7 +26,7 @@ object RemoteSendConsistencySpec { class RemoteSendConsistencySpec extends AkkaSpec(RemoteSendConsistencySpec.config) with ImplicitSender { val systemB = ActorSystem("systemB", system.settings.config) - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val addressB = RARP(systemB).provider.getDefaultAddress println(addressB) val rootB = RootActorPath(addressB) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index 7f4027f62a..ede7c7f62e 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -79,7 +79,7 @@ class RemoteWatcherSpec extends AkkaSpec( override def expectedTestDuration = 2.minutes val remoteSystem = ActorSystem("RemoteSystem", system.settings.config) - val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid Seq(system, remoteSystem).foreach(muteDeadLetters( diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala index d997cf08fd..a555c51fc6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala @@ -5,6 +5,7 @@ package akka.remote.artery import scala.concurrent.duration._ import akka.actor.{ ActorIdentity, ActorSystem, ExtendedActorSystem, Identify, RootActorPath } +import akka.remote.RARP import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.testkit.TestActors import com.typesafe.config.ConfigFactory @@ -40,7 +41,7 @@ class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) wit """).withFallback(system.settings.config) val systemB = ActorSystem("systemB", configB) systemB.actorOf(TestActors.echoActorProps, "echo") - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val addressB = RARP(systemB).provider.getDefaultAddress val rootB = RootActorPath(addressB) override def afterTermination(): Unit = shutdown(systemB) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 36b8340a4b..5df50600d1 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -4,6 +4,7 @@ package akka.remote.artery import java.util.concurrent.ThreadLocalRandom + import scala.concurrent.Await import scala.concurrent.duration._ import akka.NotUsed @@ -14,10 +15,8 @@ import akka.actor.Identify import akka.actor.InternalActorRef import akka.actor.PoisonPill import akka.actor.RootActorPath -import akka.remote.AddressUidExtension +import akka.remote.{ AddressUidExtension, RARP, RemoteActorRef, UniqueAddress } import akka.remote.EndpointManager.Send -import akka.remote.RemoteActorRef -import akka.remote.UniqueAddress import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -52,11 +51,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi import SystemMessageDeliverySpec._ val addressA = UniqueAddress( - system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + RARP(system).provider.getDefaultAddress, AddressUidExtension(system).addressUid) val systemB = ActorSystem("systemB", system.settings.config) val addressB = UniqueAddress( - systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + RARP(systemB).provider.getDefaultAddress, AddressUidExtension(systemB).addressUid) val rootB = RootActorPath(addressB.address) val matSettings = ActorMaterializerSettings(system).withFuzzing(true) diff --git a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala index 9b353a3be8..837ffd7dbe 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala @@ -23,6 +23,7 @@ import akka.testkit.TestProbe import akka.actor.ActorSelection import akka.testkit.TestEvent import akka.event.Logging +import akka.remote.RARP import akka.testkit.EventFilter object UntrustedSpec { @@ -77,7 +78,7 @@ class UntrustedSpec extends AkkaSpec(""" akka.remote.artery.hostname = localhost akka.remote.artery.port = 0 """)) - val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val addr = RARP(system).provider.getDefaultAddress val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")