Merge pull request #23612 from akka/wip-typed-remote-patriknw

Remoting for Akka Typed, #21225
This commit is contained in:
Patrik Nordwall 2017-09-20 12:24:13 +02:00 committed by GitHub
commit d0067b30dc
3 changed files with 154 additions and 1 deletions

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import java.nio.charset.StandardCharsets
import akka.Done
import akka.testkit.AkkaSpec
import akka.typed.{ ActorRef, ActorSystem }
import akka.typed.scaladsl.Actor
import akka.actor.{ ExtendedActorSystem, ActorSystem UntypedActorSystem }
import akka.cluster.Cluster
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.typesafe.config.ConfigFactory
import scala.concurrent.Promise
import akka.typed.scaladsl.adapter._
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
override def identifier = 41
override def manifest(o: AnyRef) = "a"
override def toBinary(o: AnyRef) = o match {
case RemoteMessageSpec.Ping(who)
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str)
RemoteMessageSpec.Ping(ref)
}
}
object RemoteMessageSpec {
def config = ConfigFactory.parseString(
s"""
akka {
loglevel = debug
actor {
provider = cluster
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.typed.cluster.PingSerializer"
}
serialization-bindings {
"akka.typed.cluster.RemoteMessageSpec$$Ping" = test
}
}
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
case class Ping(sender: ActorRef[String])
}
class RemoteMessageSpec extends AkkaSpec(RemoteMessageSpec.config) {
import RemoteMessageSpec._
val typedSystem = system.toTyped
"the adapted system" should {
"something something" in {
val pingPromise = Promise[Done]()
val ponger = Actor.immutable[Ping]((_, msg)
msg match {
case Ping(sender)
pingPromise.success(Done)
sender ! "pong"
Actor.stopped
})
// typed actor on system1
val pingPongActor = system.spawn(ponger, "pingpong")
val system2 = UntypedActorSystem(system.name + "-system2", RemoteMessageSpec.config)
val typedSystem2 = system2.toTyped
try {
// resolve the actor from node2
val remoteRefStr = ActorRefResolver(typedSystem).toSerializationFormat(pingPongActor)
val remoteRef: ActorRef[Ping] =
ActorRefResolver(typedSystem2).resolveActorRef[Ping](remoteRefStr)
val pongPromise = Promise[Done]()
val recipient = system2.spawn(Actor.immutable[String] { (_, msg)
pongPromise.success(Done)
Actor.stopped
}, "recipient")
remoteRef ! Ping(recipient)
pingPromise.future.futureValue should ===(Done)
pongPromise.future.futureValue should ===(Done)
} finally {
system2.terminate()
}
}
}
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.typed.ActorSystem
import akka.typed.Extension
import akka.typed.ExtensionId
import akka.typed.ActorRef
import akka.actor.ExtendedActorSystem
object ActorRefResolver extends ExtensionId[ActorRefResolver] {
def get(system: ActorSystem[_]): ActorRefResolver = apply(system)
override def createExtension(system: ActorSystem[_]): ActorRefResolver =
new ActorRefResolver(system)
}
/**
* Serialization and deserialization of `ActorRef`.
*/
class ActorRefResolver(system: ActorSystem[_]) extends Extension {
import akka.typed.scaladsl.adapter._
private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem]
/**
* Generate full String representation including the uid for the actor cell
* instance as URI fragment, replacing the Address in the RootActor Path
* with the local one unless this paths address includes host and port
* information. This representation should be used as serialized
* representation.
*/
def toSerializationFormat[T](ref: ActorRef[T]): String =
ref.path.toSerializationFormatWithAddress(untypedSystem.provider.getDefaultAddress)
/**
* Deserialize an `ActorRef` in the [[#toSerializationFormat]].
*/
def resolveActorRef[T](serializedActorRef: String): ActorRef[T] =
untypedSystem.provider.resolveActorRef(serializedActorRef)
}

View file

@ -158,7 +158,7 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck")
.dependsOn(streamTestkit % "test->test", stream) .dependsOn(streamTestkit % "test->test", stream)
lazy val typed = akkaModule("akka-typed") lazy val typed = akkaModule("akka-typed")
.dependsOn(testkit % "compile->compile;test->test") .dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test")
lazy val typedTests = akkaModule("akka-typed-tests") lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->test") .dependsOn(typed, typedTestkit % "compile->compile;test->test")