Use Java serialization for typed ActorRef #24135
This commit is contained in:
parent
a4ebc2b73f
commit
0e852fb988
4 changed files with 104 additions and 47 deletions
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.actor.typed.internal
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.{ ActorRef, TypedAkkaSpecWithShutdown }
|
||||
import akka.serialization.{ JavaSerializer, SerializationExtension }
|
||||
import akka.testkit.typed.TestKit
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object ActorRefSerializationSpec {
|
||||
def config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.actor {
|
||||
serialize-messages = off
|
||||
allow-java-serialization = true
|
||||
}
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
""")
|
||||
|
||||
case class MessageWrappingActorRef(s: String, ref: ActorRef[Unit]) extends java.io.Serializable
|
||||
}
|
||||
|
||||
class ActorRefSerializationSpec extends TestKit(ActorRefSerializationSpec.config) with TypedAkkaSpecWithShutdown {
|
||||
|
||||
val serialization = SerializationExtension(system.toUntyped)
|
||||
|
||||
"ActorRef[T]" must {
|
||||
"be serialized and deserialized by MiscMessageSerializer" in {
|
||||
val obj = spawn(Behaviors.empty[Unit])
|
||||
serialization.findSerializerFor(obj) match {
|
||||
case serializer: MiscMessageSerializer ⇒
|
||||
val blob = serializer.toBinary(obj)
|
||||
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
|
||||
ref should ===(obj)
|
||||
case s ⇒
|
||||
throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}")
|
||||
}
|
||||
}
|
||||
|
||||
"be serialized and deserialized by JavaSerializer inside another java.io.Serializable message" in {
|
||||
val ref = spawn(Behaviors.empty[Unit])
|
||||
val obj = ActorRefSerializationSpec.MessageWrappingActorRef("some message", ref)
|
||||
|
||||
serialization.findSerializerFor(obj) match {
|
||||
case serializer: JavaSerializer ⇒
|
||||
val blob = serializer.toBinary(obj)
|
||||
val restored = serializer.fromBinary(blob, None)
|
||||
restored should ===(obj)
|
||||
case s ⇒
|
||||
throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,46 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.actor.typed.internal
|
||||
|
||||
import akka.actor.typed.TypedAkkaSpecWithShutdown
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.testkit.typed.TestKit
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object MiscMessageSerializerSpec {
|
||||
def config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.actor {
|
||||
serialize-messages = off
|
||||
allow-java-serialization = true
|
||||
}
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
""")
|
||||
}
|
||||
|
||||
class MiscMessageSerializerSpec extends TestKit(MiscMessageSerializerSpec.config) with TypedAkkaSpecWithShutdown {
|
||||
|
||||
val serialization = SerializationExtension(system.toUntyped)
|
||||
|
||||
"MiscMessageSerializer" must {
|
||||
def checkSerialization(obj: AnyRef): Unit = {
|
||||
serialization.findSerializerFor(obj) match {
|
||||
case serializer: MiscMessageSerializer ⇒
|
||||
val blob = serializer.toBinary(obj)
|
||||
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
|
||||
ref should ===(obj)
|
||||
case s ⇒
|
||||
throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}")
|
||||
}
|
||||
}
|
||||
|
||||
"must serialize and deserialize typed actor refs" in {
|
||||
val ref = spawn(Behaviors.empty[Unit])
|
||||
checkSerialization(ref)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -21,7 +21,7 @@ import scala.util.Success
|
|||
* [[EventStream]] on a best effort basis
|
||||
* (i.e. this delivery is not reliable).
|
||||
*/
|
||||
trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] {
|
||||
trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] with java.io.Serializable {
|
||||
/**
|
||||
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
|
||||
* messaging semantics.
|
||||
|
|
@ -48,6 +48,8 @@ trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] {
|
|||
*/
|
||||
def path: a.ActorPath
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = SerializedActorRef[T](this)
|
||||
}
|
||||
|
||||
object ActorRef {
|
||||
|
|
@ -72,3 +74,43 @@ object ActorRef {
|
|||
case _ ⇒ throw new IllegalStateException("Only expecting completed futures until the native actor system is implemented")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object SerializedActorRef {
|
||||
def apply[T](actorRef: ActorRef[T]): SerializedActorRef[T] = {
|
||||
new SerializedActorRef(actorRef)
|
||||
}
|
||||
|
||||
def toAddress[T](actorRef: ActorRef[T]) = {
|
||||
import akka.serialization.JavaSerializer.currentSystem
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
val resolver = ActorRefResolver(currentSystem.value.toTyped)
|
||||
resolver.toSerializationFormat(actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Memento pattern for serializing ActorRefs transparently
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] final case class SerializedActorRef[T] private (address: String) {
|
||||
import akka.serialization.JavaSerializer.currentSystem
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
def this(actorRef: ActorRef[T]) =
|
||||
this(SerializedActorRef.toAddress(actorRef))
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = currentSystem.value match {
|
||||
case null ⇒
|
||||
throw new IllegalStateException(
|
||||
"Trying to deserialize a serialized typed ActorRef without an ActorSystem in scope." +
|
||||
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
|
||||
case someSystem ⇒
|
||||
val resolver = ActorRefResolver(someSystem.toTyped)
|
||||
resolver.resolveActorRef(address)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,9 @@ import akka.dispatch.sysmsg
|
|||
override def isLocal: Boolean = untyped.isLocal
|
||||
override def sendSystem(signal: internal.SystemMessage): Unit =
|
||||
ActorRefAdapter.sendSystemMessage(untyped, signal)
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = SerializedActorRef[T](this)
|
||||
}
|
||||
|
||||
private[akka] object ActorRefAdapter {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue