use CBOR in some tests instead of custom serializers (#27882)
This commit is contained in:
parent
ca8995be68
commit
0236afc039
8 changed files with 123 additions and 253 deletions
|
|
@ -4,19 +4,15 @@
|
|||
|
||||
package akka.cluster.sharding.typed.scaladsl
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.testkit.typed.scaladsl.ActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorRefResolver
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
|
|
@ -28,11 +24,11 @@ import akka.cluster.typed.Cluster
|
|||
import akka.cluster.typed.Join
|
||||
import akka.cluster.typed.Leave
|
||||
import akka.pattern.AskTimeoutException
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import akka.util.Timeout
|
||||
import akka.util.ccompat._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
import akka.util.ccompat._
|
||||
|
||||
@ccompatUsedUntil213
|
||||
object ClusterShardingSpec {
|
||||
|
|
@ -50,83 +46,20 @@ object ClusterShardingSpec {
|
|||
|
||||
akka.coordinated-shutdown.terminate-actor-system = off
|
||||
akka.coordinated-shutdown.run-by-actor-system-terminate = off
|
||||
|
||||
akka.actor {
|
||||
serializers {
|
||||
test = "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$Serializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$TestProtocol" = test
|
||||
"akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
sealed trait TestProtocol extends java.io.Serializable
|
||||
sealed trait TestProtocol extends CborSerializable
|
||||
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
|
||||
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
|
||||
final case class WhoAreYou2(x: Int, replyTo: ActorRef[String]) extends TestProtocol
|
||||
final case class StopPlz() extends TestProtocol
|
||||
final case class PassivatePlz() extends TestProtocol
|
||||
|
||||
sealed trait IdTestProtocol extends java.io.Serializable
|
||||
sealed trait IdTestProtocol extends CborSerializable
|
||||
final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol
|
||||
final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol
|
||||
final case class IdStopPlz() extends IdTestProtocol
|
||||
|
||||
class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
def identifier: Int = 48
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case _: ReplyPlz => "a"
|
||||
case _: WhoAreYou => "b"
|
||||
case _: StopPlz => "c"
|
||||
case _: PassivatePlz => "d"
|
||||
case _: IdReplyPlz => "A"
|
||||
case _: IdWhoAreYou => "B"
|
||||
case _: IdStopPlz => "C"
|
||||
}
|
||||
|
||||
private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] =
|
||||
ActorRefResolver(system.toTyped).toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
|
||||
|
||||
private def idAndRefToBinary(id: String, ref: ActorRef[_]): Array[Byte] = {
|
||||
val idBytes = id.getBytes(StandardCharsets.UTF_8)
|
||||
val refBytes = actorRefToBinary(ref)
|
||||
// yeah, very ad-hoc ;-)
|
||||
Array(idBytes.length.toByte) ++ idBytes ++ refBytes
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case ReplyPlz(ref) => actorRefToBinary(ref)
|
||||
case WhoAreYou(ref) => actorRefToBinary(ref)
|
||||
case _: StopPlz => Array.emptyByteArray
|
||||
case _: PassivatePlz => Array.emptyByteArray
|
||||
case IdReplyPlz(id, ref) => idAndRefToBinary(id, ref)
|
||||
case IdWhoAreYou(id, ref) => idAndRefToBinary(id, ref)
|
||||
case _: IdStopPlz => Array.emptyByteArray
|
||||
}
|
||||
|
||||
private def actorRefFromBinary[T](bytes: Array[Byte]): ActorRef[T] =
|
||||
ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
||||
|
||||
private def idAndRefFromBinary[T](bytes: Array[Byte]): (String, ActorRef[T]) = {
|
||||
val idLength = bytes(0)
|
||||
val id = new String(bytes.slice(1, idLength + 1), StandardCharsets.UTF_8)
|
||||
val ref = actorRefFromBinary(bytes.drop(1 + idLength))
|
||||
(id, ref)
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||
case "a" => ReplyPlz(actorRefFromBinary(bytes))
|
||||
case "b" => WhoAreYou(actorRefFromBinary(bytes))
|
||||
case "c" => StopPlz()
|
||||
case "d" => PassivatePlz()
|
||||
case "A" => IdReplyPlz.tupled(idAndRefFromBinary(bytes))
|
||||
case "B" => IdWhoAreYou.tupled(idAndRefFromBinary(bytes))
|
||||
case "C" => IdStopPlz()
|
||||
}
|
||||
}
|
||||
|
||||
final case class TheReply(s: String)
|
||||
|
||||
val typeKeyWithEnvelopes = EntityTypeKey[TestProtocol]("envelope-shard")
|
||||
|
|
|
|||
|
|
@ -48,7 +48,13 @@ public class PingSerializerExampleTest {
|
|||
public String manifest(Object obj) {
|
||||
if (obj instanceof Ping) return PING_MANIFEST;
|
||||
else if (obj instanceof Pong) return PONG_MANIFEST;
|
||||
else throw new IllegalArgumentException("Unknown type: " + obj);
|
||||
else
|
||||
throw new IllegalArgumentException(
|
||||
"Can't serialize object of type "
|
||||
+ obj.getClass()
|
||||
+ " in ["
|
||||
+ getClass().getName()
|
||||
+ "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -58,11 +64,17 @@ public class PingSerializerExampleTest {
|
|||
.toSerializationFormat(((Ping) obj).replyTo)
|
||||
.getBytes(StandardCharsets.UTF_8);
|
||||
else if (obj instanceof Pong) return new byte[0];
|
||||
else throw new IllegalArgumentException("Unknown type: " + obj);
|
||||
else
|
||||
throw new IllegalArgumentException(
|
||||
"Can't serialize object of type "
|
||||
+ obj.getClass()
|
||||
+ " in ["
|
||||
+ getClass().getName()
|
||||
+ "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object fromBinary(byte[] bytes, String manifest) throws NotSerializableException {
|
||||
public Object fromBinary(byte[] bytes, String manifest) {
|
||||
if (PING_MANIFEST.equals(manifest)) {
|
||||
String str = new String(bytes, StandardCharsets.UTF_8);
|
||||
ActorRef<Pong> ref = actorRefResolver.resolveActorRef(str);
|
||||
|
|
@ -70,7 +82,7 @@ public class PingSerializerExampleTest {
|
|||
} else if (PONG_MANIFEST.equals(manifest)) {
|
||||
return new Pong();
|
||||
} else {
|
||||
throw new NotSerializableException("Unable to handle manifest: " + manifest);
|
||||
throw new IllegalArgumentException("Unable to handle manifest: " + manifest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.cluster.typed
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -11,21 +13,50 @@ import scala.util.control.NonFatal
|
|||
|
||||
import akka.Done
|
||||
import akka.actor.CoordinatedShutdown
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.testkit.typed.scaladsl.TestInbox
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorRefResolver
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest._
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.Span
|
||||
|
||||
object ActorSystemSpec {
|
||||
|
||||
class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
// Reproducer of issue #24620, by eagerly creating the ActorRefResolver in serializer
|
||||
private val actorRefResolver = ActorRefResolver(system.toTyped)
|
||||
|
||||
def identifier: Int = 47
|
||||
def manifest(o: AnyRef): String =
|
||||
"a"
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case TestMessage(ref) => actorRefResolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||
case "a" => TestMessage(actorRefResolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
|
||||
case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
|
||||
}
|
||||
}
|
||||
|
||||
final case class TestMessage(ref: ActorRef[String])
|
||||
|
||||
}
|
||||
|
||||
class ActorSystemSpec
|
||||
extends WordSpec
|
||||
with Matchers
|
||||
|
|
@ -41,6 +72,13 @@ class ActorSystemSpec
|
|||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
|
||||
serializers {
|
||||
test = "akka.cluster.typed.ActorSystemSpec$$TestSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.typed.ActorSystemSpec$$TestMessage" = test
|
||||
}
|
||||
""")
|
||||
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name, config)
|
||||
def suite = "adapter"
|
||||
|
|
|
|||
|
|
@ -4,38 +4,24 @@
|
|||
|
||||
package akka.cluster.typed
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.testkit.typed.TestKitSettings
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.{ ActorRef, ActorRefResolver }
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.TestKitSettings
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object ClusterSingletonApiSpec {
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.actor {
|
||||
provider = cluster
|
||||
|
||||
serializers {
|
||||
test = "akka.cluster.typed.ClusterSingletonApiSpec$$PingSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.typed.ClusterSingletonApiSpec$$Ping" = test
|
||||
"akka.cluster.typed.ClusterSingletonApiSpec$$Pong$$" = test
|
||||
"akka.cluster.typed.ClusterSingletonApiSpec$$Perish$$" = test
|
||||
}
|
||||
}
|
||||
akka.actor.provider = cluster
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
|
|
@ -43,10 +29,10 @@ object ClusterSingletonApiSpec {
|
|||
""")
|
||||
|
||||
trait PingProtocol
|
||||
case object Pong
|
||||
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol
|
||||
case object Pong extends CborSerializable
|
||||
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol with CborSerializable
|
||||
|
||||
case object Perish extends PingProtocol
|
||||
case object Perish extends PingProtocol with CborSerializable
|
||||
|
||||
val pingPong = Behaviors.receive[PingProtocol] { (_, msg) =>
|
||||
msg match {
|
||||
|
|
@ -60,29 +46,6 @@ object ClusterSingletonApiSpec {
|
|||
|
||||
}
|
||||
|
||||
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
// Reproducer of issue #24620, by eagerly creating the ActorRefResolver in serializer
|
||||
val actorRefResolver = ActorRefResolver(system.toTyped)
|
||||
|
||||
def identifier: Int = 47
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case _: Ping => "a"
|
||||
case Pong => "b"
|
||||
case Perish => "c"
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case p: Ping => actorRefResolver.toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
|
||||
case Pong => Array.emptyByteArray
|
||||
case Perish => Array.emptyByteArray
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||
case "a" => Ping(actorRefResolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
|
||||
case "b" => Pong
|
||||
case "c" => Perish
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ClusterSingletonApiSpec
|
||||
|
|
|
|||
|
|
@ -4,59 +4,29 @@
|
|||
|
||||
package akka.cluster.typed
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.typed.receptionist.Receptionist.Registered
|
||||
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.{ ActorRef, ActorRefResolver, ActorSystem }
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.Receptionist.Registered
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import akka.util.Timeout
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class RemoteContextAskSpecSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
override def identifier = 41
|
||||
override def manifest(o: AnyRef) = o match {
|
||||
case _: RemoteContextAskSpec.Ping => "a"
|
||||
case RemoteContextAskSpec.Pong => "b"
|
||||
}
|
||||
override def toBinary(o: AnyRef) = o match {
|
||||
case RemoteContextAskSpec.Ping(who) =>
|
||||
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
|
||||
case RemoteContextAskSpec.Pong => Array.emptyByteArray
|
||||
}
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
|
||||
case "a" =>
|
||||
val str = new String(bytes, StandardCharsets.UTF_8)
|
||||
val ref = ActorRefResolver(system.toTyped).resolveActorRef[RemoteContextAskSpec.Pong.type](str)
|
||||
RemoteContextAskSpec.Ping(ref)
|
||||
case "b" => RemoteContextAskSpec.Pong
|
||||
}
|
||||
}
|
||||
|
||||
object RemoteContextAskSpec {
|
||||
def config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
loglevel = debug
|
||||
actor {
|
||||
provider = cluster
|
||||
serializers {
|
||||
test = "akka.cluster.typed.RemoteContextAskSpecSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.typed.RemoteContextAskSpec$$Ping" = test
|
||||
"akka.cluster.typed.RemoteContextAskSpec$$Pong$$" = test
|
||||
}
|
||||
}
|
||||
actor.provider = cluster
|
||||
remote.classic.netty.tcp.port = 0
|
||||
remote.classic.netty.tcp.host = 127.0.0.1
|
||||
remote.artery {
|
||||
|
|
@ -68,8 +38,8 @@ object RemoteContextAskSpec {
|
|||
}
|
||||
""")
|
||||
|
||||
case object Pong
|
||||
case class Ping(respondTo: ActorRef[Pong.type])
|
||||
case object Pong extends CborSerializable
|
||||
case class Ping(respondTo: ActorRef[Pong.type]) extends CborSerializable
|
||||
|
||||
def pingPong = Behaviors.receive[Ping] { (_, msg) =>
|
||||
msg match {
|
||||
|
|
|
|||
|
|
@ -4,46 +4,23 @@
|
|||
|
||||
package akka.cluster.typed
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import akka.Done
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.typed.{ ActorRef, ActorRefResolver }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.{ ExtendedActorSystem, ActorSystem => ClassicActorSystem }
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.concurrent.Promise
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorRefResolver
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
|
||||
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
override def identifier = 41
|
||||
override def manifest(o: AnyRef) = "a"
|
||||
override def toBinary(o: AnyRef) = o match {
|
||||
case RemoteMessageSpec.Ping(who) =>
|
||||
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
|
||||
}
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String) = {
|
||||
val str = new String(bytes, StandardCharsets.UTF_8)
|
||||
val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str)
|
||||
RemoteMessageSpec.Ping(ref)
|
||||
}
|
||||
}
|
||||
import akka.actor.{ ActorSystem => ClassicActorSystem }
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import akka.testkit.AkkaSpec
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object RemoteMessageSpec {
|
||||
def config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
loglevel = debug
|
||||
actor {
|
||||
provider = cluster
|
||||
serializers {
|
||||
test = "akka.cluster.typed.PingSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.typed.RemoteMessageSpec$$Ping" = test
|
||||
}
|
||||
}
|
||||
actor.provider = cluster
|
||||
remote.classic.netty.tcp.port = 0
|
||||
remote.artery {
|
||||
canonical {
|
||||
|
|
@ -54,7 +31,7 @@ object RemoteMessageSpec {
|
|||
}
|
||||
""")
|
||||
|
||||
case class Ping(sender: ActorRef[String])
|
||||
case class Ping(sender: ActorRef[String]) extends CborSerializable
|
||||
}
|
||||
|
||||
class RemoteMessageSpec extends AkkaSpec(RemoteMessageSpec.config) {
|
||||
|
|
|
|||
|
|
@ -4,42 +4,35 @@
|
|||
|
||||
package akka.cluster.typed.internal.receptionist
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import akka.actor.{ ExtendedActorSystem, RootActorPath }
|
||||
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.{ ActorRef, ActorRefResolver }
|
||||
import akka.cluster.MemberStatus
|
||||
import akka.cluster.typed.{ Cluster, Join }
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.actor.testkit.typed.FishingOutcome
|
||||
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, FishingOutcomes, TestProbe }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.{ Matchers, WordSpec }
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.RootActorPath
|
||||
import akka.actor.testkit.typed.FishingOutcome
|
||||
import akka.actor.testkit.typed.scaladsl.ActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.FishingOutcomes
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.cluster.MemberStatus
|
||||
import akka.cluster.typed.Cluster
|
||||
import akka.cluster.typed.Down
|
||||
import akka.cluster.typed.Join
|
||||
import akka.cluster.typed.JoinSeedNodes
|
||||
import akka.cluster.typed.Leave
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
object ClusterReceptionistSpec {
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.loglevel = DEBUG # issue #24960
|
||||
akka.actor {
|
||||
provider = cluster
|
||||
serializers {
|
||||
test = "akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$PingSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$Ping" = test
|
||||
"akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$Pong$$" = test
|
||||
"akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$Perish$$" = test
|
||||
}
|
||||
}
|
||||
akka.actor.provider = cluster
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.classic.netty.tcp.host = 127.0.0.1
|
||||
akka.remote.artery.canonical.port = 0
|
||||
|
|
@ -57,10 +50,10 @@ object ClusterReceptionistSpec {
|
|||
}
|
||||
""")
|
||||
|
||||
case object Pong
|
||||
case object Pong extends CborSerializable
|
||||
trait PingProtocol
|
||||
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol
|
||||
case object Perish extends PingProtocol
|
||||
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol with CborSerializable
|
||||
case object Perish extends PingProtocol with CborSerializable
|
||||
|
||||
val pingPongBehavior = Behaviors.receive[PingProtocol] { (_, msg) =>
|
||||
msg match {
|
||||
|
|
@ -73,28 +66,6 @@ object ClusterReceptionistSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
|
||||
def identifier: Int = 47
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case _: Ping => "a"
|
||||
case Pong => "b"
|
||||
case Perish => "c"
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case p: Ping =>
|
||||
ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
|
||||
case Pong => Array.emptyByteArray
|
||||
case Perish => Array.emptyByteArray
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||
case "a" => Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
|
||||
case "b" => Pong
|
||||
case "c" => Perish
|
||||
}
|
||||
}
|
||||
|
||||
val PingKey = ServiceKey[PingProtocol]("pingy")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,8 @@ class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringMa
|
|||
override def manifest(msg: AnyRef) = msg match {
|
||||
case _: PingService.Ping => PingManifest
|
||||
case PingService.Pong => PongManifest
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def toBinary(msg: AnyRef) = msg match {
|
||||
|
|
@ -31,6 +33,8 @@ class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringMa
|
|||
actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
|
||||
case PingService.Pong =>
|
||||
Array.emptyByteArray
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String) = {
|
||||
|
|
@ -41,6 +45,8 @@ class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringMa
|
|||
PingService.Ping(ref)
|
||||
case PongManifest =>
|
||||
PingService.Pong
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue