Allow introspection on typed EntityRefs to support serialization #29955

This commit is contained in:
Levi Ramsey 2021-02-16 03:19:36 -05:00 committed by GitHub
parent d92dc9c321
commit d51f1e17b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 106 additions and 15 deletions

View file

@ -0,0 +1,8 @@
# Changes to @DoNotInherit classes
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.entityId")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.typeKey")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.dataCenter")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.EntityRefImpl.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl.this")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityTypeKey.asJava")

View file

@ -85,6 +85,8 @@ import akka.util.JavaDurationConverters._
with scaladsl.EntityTypeKey[T] { with scaladsl.EntityTypeKey[T] {
override def toString: String = s"EntityTypeKey[$messageClassName]($name)" override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
private[akka] def asJava: javadsl.EntityTypeKey[T] = this
} }
/** INTERNAL API */ /** INTERNAL API */
@ -251,12 +253,13 @@ import akka.util.JavaDurationConverters._
entityId: String, entityId: String,
dataCenter: DataCenter): scaladsl.EntityRef[M] = { dataCenter: DataCenter): scaladsl.EntityRef[M] = {
if (dataCenter == cluster.selfMember.dataCenter) if (dataCenter == cluster.selfMember.dataCenter)
entityRefFor(typeKey, entityId) entityRefFor(typeKey, entityId).asInstanceOf[EntityRefImpl[M]].withDataCenter(Some(dataCenter))
else else
new EntityRefImpl[M]( new EntityRefImpl[M](
classicSharding.shardRegionProxy(typeKey.name, dataCenter), classicSharding.shardRegionProxy(typeKey.name, dataCenter),
entityId, entityId,
typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) typeKey.asInstanceOf[EntityTypeKeyImpl[M]],
Some(dataCenter))
} }
override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = { override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = {
@ -271,12 +274,13 @@ import akka.util.JavaDurationConverters._
entityId: String, entityId: String,
dataCenter: String): javadsl.EntityRef[M] = { dataCenter: String): javadsl.EntityRef[M] = {
if (dataCenter == cluster.selfMember.dataCenter) if (dataCenter == cluster.selfMember.dataCenter)
entityRefFor(typeKey, entityId) entityRefFor(typeKey, entityId).asInstanceOf[EntityRefImpl[M]].withDataCenter(Some(dataCenter))
else else
new EntityRefImpl[M]( new EntityRefImpl[M](
classicSharding.shardRegionProxy(typeKey.name, dataCenter), classicSharding.shardRegionProxy(typeKey.name, dataCenter),
entityId, entityId,
typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) typeKey.asInstanceOf[EntityTypeKeyImpl[M]],
Some(dataCenter))
} }
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
@ -306,12 +310,26 @@ import akka.util.JavaDurationConverters._
*/ */
@InternalApi private[akka] final class EntityRefImpl[M]( @InternalApi private[akka] final class EntityRefImpl[M](
shardRegion: akka.actor.ActorRef, shardRegion: akka.actor.ActorRef,
entityId: String, override val entityId: String,
typeKey: EntityTypeKeyImpl[M]) override val typeKey: EntityTypeKeyImpl[M],
override val dataCenter: Option[String] = None)
extends javadsl.EntityRef[M] extends javadsl.EntityRef[M]
with scaladsl.EntityRef[M] with scaladsl.EntityRef[M]
with InternalRecipientRef[M] { with InternalRecipientRef[M] {
override def hashCode(): Int =
// 3 and 5 chosen as primes which are +/- 1 from a power-of-two
((entityId.hashCode * 3) + typeKey.hashCode) * 5 + dataCenter.hashCode
override def equals(other: Any): Boolean =
other match {
case eri: EntityRefImpl[_] =>
(eri.entityId == entityId) &&
(eri.typeKey == typeKey) &&
(eri.dataCenter == dataCenter)
case _ => false
}
override val refPrefix = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8) override val refPrefix = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8)
override def tell(msg: M): Unit = override def tell(msg: M): Unit =
@ -399,6 +417,9 @@ import akka.util.JavaDurationConverters._
* INTERNAL API * INTERNAL API
*/ */
override private[akka] def asJava: javadsl.EntityRef[M] = this override private[akka] def asJava: javadsl.EntityRef[M] = this
private[internal] def withDataCenter(dataCenter: Option[String]): EntityRefImpl[M] =
new EntityRefImpl[M](shardRegion, entityId, typeKey, dataCenter)
} }
/** /**

View file

@ -25,12 +25,18 @@ import akka.util.Timeout
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class TestEntityRefImpl[M](entityId: String, probe: ActorRef[M]) @InternalApi private[akka] final class TestEntityRefImpl[M](
override val entityId: String,
probe: ActorRef[M],
override val typeKey: scaladsl.EntityTypeKey[M])
extends javadsl.EntityRef[M] extends javadsl.EntityRef[M]
with scaladsl.EntityRef[M] with scaladsl.EntityRef[M]
with InternalRecipientRef[M] { with InternalRecipientRef[M] {
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
override def dataCenter: Option[String] = None
override def tell(msg: M): Unit = override def tell(msg: M): Unit =
probe ! msg probe ! msg

View file

@ -402,7 +402,7 @@ object StartEntity {
* *
* Not for user extension. * Not for user extension.
*/ */
@DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T] => @DoNotInherit abstract class EntityTypeKey[-T] { scaladslSelf: scaladsl.EntityTypeKey[T] =>
/** /**
* Name of the entity type. * Name of the entity type.
@ -442,6 +442,23 @@ object EntityTypeKey {
@DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] { @DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] {
scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] => scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] =>
/**
* The identifier for the particular entity referenced by this EntityRef.
*/
def getEntityId: String = entityId
/**
* The name of the EntityTypeKey associated with this EntityRef
*/
def getTypeKey: javadsl.EntityTypeKey[M] = typeKey.asJava
/**
* The specified datacenter of the incarnation of the particular entity referenced by this EntityRef,
* if a datacenter was specified.
*/
def getDataCenter: Optional[String] =
Optional.ofNullable(dataCenter.orNull)
/** /**
* Send a message to the entity referenced by this EntityRef using *at-most-once* * Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics. * messaging semantics.

View file

@ -391,6 +391,7 @@ object StartEntity {
*/ */
def name: String def name: String
private[akka] def asJava: javadsl.EntityTypeKey[T]
} }
object EntityTypeKey { object EntityTypeKey {
@ -417,6 +418,27 @@ object EntityTypeKey {
*/ */
@DoNotInherit trait EntityRef[-M] extends RecipientRef[M] { this: InternalRecipientRef[M] => @DoNotInherit trait EntityRef[-M] extends RecipientRef[M] { this: InternalRecipientRef[M] =>
/**
* The identifier for the particular entity referenced by this EntityRef.
*
* {{{
* // given sharding, typeKey
* sharding.entityRefFor(typeKey, "someId").entityId == "someId" // always true
* }}}
*/
def entityId: String
/**
* The EntityTypeKey associated with this EntityRef.
*/
def typeKey: EntityTypeKey[M]
/**
* The specified datacenter of the incarnation of the particular entity referenced by this EntityRef,
* if a datacenter was specified.
*/
def dataCenter: Option[String]
/** /**
* Send a message to the entity referenced by this EntityRef using *at-most-once* * Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics. * messaging semantics.

View file

@ -8,13 +8,12 @@ import akka.actor.typed.ActorRef
import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl
import akka.cluster.sharding.typed.javadsl.EntityRef import akka.cluster.sharding.typed.javadsl.EntityRef
import akka.cluster.sharding.typed.javadsl.EntityTypeKey import akka.cluster.sharding.typed.javadsl.EntityTypeKey
import akka.util.unused
/** /**
* For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]]. * For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]].
* It forwards all messages to the `probe`. * It forwards all messages to the `probe`.
*/ */
object TestEntityRef { object TestEntityRef {
def of[M](@unused typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = def of[M](typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] =
new TestEntityRefImpl[M](entityId, probe) new TestEntityRefImpl[M](entityId, probe, typeKey.asScala)
} }

View file

@ -8,13 +8,12 @@ import akka.actor.typed.ActorRef
import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl
import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.util.unused
/** /**
* For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]]. * For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]].
* It forwards all messages to the `probe`. * It forwards all messages to the `probe`.
*/ */
object TestEntityRef { object TestEntityRef {
def apply[M](@unused typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = def apply[M](typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] =
new TestEntityRefImpl[M](entityId, probe) new TestEntityRefImpl[M](entityId, probe, typeKey)
} }

View file

@ -84,7 +84,7 @@ Scala
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #init } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #init }
Messages to a specific entity are then sent via an `EntityRef`. Messages to a specific entity are then sent via an `EntityRef`. The `entityId` and the name of the Entity's key can be retrieved from the `EntityRef`.
It is also possible to wrap methods in a `ShardingEnvelope` or define extractor functions and send messages directly to the shard region. It is also possible to wrap methods in a `ShardingEnvelope` or define extractor functions and send messages directly to the shard region.
Scala Scala
@ -107,7 +107,16 @@ Scala
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #roles } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #roles }
### A note about EntityRef and serialization
If including `EntityRef`s in messages or the `State`/`Event`s of an `EventSourcedBehavior`, those `EntityRef`s will need to be serialized.
The @scala[`entityId`, `typeKey`, and (in multi-DC use-cases) `dataCenter` of an `EntityRef`]@java[`getEntityId`, `getTypeKey`, and (in multi-DC use-cases) `getDataCenter` methods of an `EntityRef`]
provide exactly the information needed upon deserialization to regenerate an `EntityRef` equivalent to the one serialized, given an expected
type of messages to send to the entity.
At this time, serialization of `EntityRef`s requires a @ref:[custom serializer](../serialization.md#customization), as the specific
`EntityTypeKey` (including the type of message which the desired entity type accepts) should not simply be encoded in the serialized
representation but looked up on the deserializing side.
## Persistence example ## Persistence example

View file

@ -546,3 +546,13 @@ Java
A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey` A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey`
could be included in the message if it is not known statically. could be included in the message if it is not known statically.
As an "alternative to the alternative", an @apidoc[typed.*.EntityRef] can be included in the messages. The `EntityRef` transparently wraps messages in a `ShardingEnvelope` and
sends them via sharding. If the target sharded entity has been passivated, it will be delivered to a new incarnation of that entity; if the target sharded entity
has been moved to a different cluster node, it will be routed to that new node. If using this approach, be aware that at this time, @ref:[a custom serializer is required](cluster-sharding.md#a-note-about-entityref-and-serialization).
As with directly including the `entityId` and `EntityTypeKey` in the message, `EntityRef`s do not support message adaptation: the response has to be in the protocol
of the entity being responded to.
In some cases, it may be useful to define messages with a @apidoc[akka.actor.typed.RecipientRef] which is a common supertype of `ActorRef` and `EntityRef`. At this time,
serializing a `RecipientRef` requires a custom serializer.