From fd11b94b1fd720e570e84c631aa01af86b207b90 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Sep 2017 10:19:43 +0200 Subject: [PATCH 1/2] Remoting for Akka Typed, #21225 * seems like it will just work when using the adapters, since it will simply delegate to the untyped RemoteActorRef * ActorRefResolver was added for upporting erialization of typed ActorRef * The ActorRef itself is not serializable with Java serialization, and we shouldn't do that --- .../akka/typed/cluster/ActorRefResolver.scala | 43 +++++++++++++++++++ build.sbt | 2 +- 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 akka-typed/src/main/scala/akka/typed/cluster/ActorRefResolver.scala diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ActorRefResolver.scala b/akka-typed/src/main/scala/akka/typed/cluster/ActorRefResolver.scala new file mode 100644 index 0000000000..a5460ec1d0 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ActorRefResolver.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.typed.ActorSystem +import akka.typed.Extension +import akka.typed.ExtensionId +import akka.typed.ActorRef +import akka.actor.ExtendedActorSystem + +object ActorRefResolver extends ExtensionId[ActorRefResolver] { + def get(system: ActorSystem[_]): ActorRefResolver = apply(system) + + override def createExtension(system: ActorSystem[_]): ActorRefResolver = + new ActorRefResolver(system) +} + +/** + * Serialization and deserialization of `ActorRef`. + */ +class ActorRefResolver(system: ActorSystem[_]) extends Extension { + import akka.typed.scaladsl.adapter._ + + private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem] + + /** + * Generate full String representation including the uid for the actor cell + * instance as URI fragment, replacing the Address in the RootActor Path + * with the local one unless this path’s address includes host and port + * information. This representation should be used as serialized + * representation. + */ + def toSerializationFormat[T](ref: ActorRef[T]): String = + ref.path.toSerializationFormatWithAddress(untypedSystem.provider.getDefaultAddress) + + /** + * Deserialize an `ActorRef` in the [[#toSerializationFormat]]. + */ + def resolveActorRef[T](serializedActorRef: String): ActorRef[T] = + untypedSystem.provider.resolveActorRef(serializedActorRef) +} + diff --git a/build.sbt b/build.sbt index 189521ed83..5db6120695 100644 --- a/build.sbt +++ b/build.sbt @@ -158,7 +158,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck") .dependsOn(streamTestkit % "test->test", stream) lazy val typed = akkaModule("akka-typed") - .dependsOn(testkit % "compile->compile;test->test") + .dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test") lazy val typedTests = akkaModule("akka-typed-tests") .dependsOn(typed, typedTestkit % "compile->compile;test->test") From 925cc163f7e8e5b0123ba5ff322abfb4f7ab8b01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 8 Sep 2017 10:22:36 +0200 Subject: [PATCH 2/2] First test for Akka Typed remoting, #21225 --- .../typed/cluster/RemoteMessageSpec.scala | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 akka-typed-tests/src/test/scala/akka/typed/cluster/RemoteMessageSpec.scala diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/RemoteMessageSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/RemoteMessageSpec.scala new file mode 100644 index 0000000000..23165a4144 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/RemoteMessageSpec.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster + +import java.nio.charset.StandardCharsets + +import akka.Done +import akka.testkit.AkkaSpec +import akka.typed.{ ActorRef, ActorSystem } +import akka.typed.scaladsl.Actor +import akka.actor.{ ExtendedActorSystem, ActorSystem ⇒ UntypedActorSystem } +import akka.cluster.Cluster +import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } +import com.typesafe.config.ConfigFactory +import scala.concurrent.Promise +import akka.typed.scaladsl.adapter._ + +class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + override def identifier = 41 + override def manifest(o: AnyRef) = "a" + override def toBinary(o: AnyRef) = o match { + case RemoteMessageSpec.Ping(who) ⇒ + ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) + } + override def fromBinary(bytes: Array[Byte], manifest: String) = { + val str = new String(bytes, StandardCharsets.UTF_8) + val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str) + RemoteMessageSpec.Ping(ref) + } +} + +object RemoteMessageSpec { + def config = ConfigFactory.parseString( + s""" + akka { + loglevel = debug + actor { + provider = cluster + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.typed.cluster.PingSerializer" + } + serialization-bindings { + "akka.typed.cluster.RemoteMessageSpec$$Ping" = test + } + } + remote.artery { + enabled = on + canonical { + hostname = 127.0.0.1 + port = 0 + } + } + } + """) + + case class Ping(sender: ActorRef[String]) +} + +class RemoteMessageSpec extends AkkaSpec(RemoteMessageSpec.config) { + + import RemoteMessageSpec._ + + val typedSystem = system.toTyped + + "the adapted system" should { + + "something something" in { + + val pingPromise = Promise[Done]() + val ponger = Actor.immutable[Ping]((_, msg) ⇒ + msg match { + case Ping(sender) ⇒ + pingPromise.success(Done) + sender ! "pong" + Actor.stopped + }) + + // typed actor on system1 + val pingPongActor = system.spawn(ponger, "pingpong") + + val system2 = UntypedActorSystem(system.name + "-system2", RemoteMessageSpec.config) + val typedSystem2 = system2.toTyped + try { + + // resolve the actor from node2 + val remoteRefStr = ActorRefResolver(typedSystem).toSerializationFormat(pingPongActor) + val remoteRef: ActorRef[Ping] = + ActorRefResolver(typedSystem2).resolveActorRef[Ping](remoteRefStr) + + val pongPromise = Promise[Done]() + val recipient = system2.spawn(Actor.immutable[String] { (_, msg) ⇒ + pongPromise.success(Done) + Actor.stopped + }, "recipient") + remoteRef ! Ping(recipient) + + pingPromise.future.futureValue should ===(Done) + pongPromise.future.futureValue should ===(Done) + + } finally { + system2.terminate() + } + } + + } + +}