From d51f1e17b49a4fba45d25d54b30e9bb609e4ad56 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Tue, 16 Feb 2021 03:19:36 -0500 Subject: [PATCH] Allow introspection on typed EntityRefs to support serialization #29955 --- .../pr-29960-entityref-introspection.excludes | 8 +++++ .../typed/internal/ClusterShardingImpl.scala | 33 +++++++++++++++---- .../internal/testkit/TestEntityRefImpl.scala | 8 ++++- .../typed/javadsl/ClusterSharding.scala | 19 ++++++++++- .../typed/scaladsl/ClusterSharding.scala | 22 +++++++++++++ .../typed/testkit/javadsl/EntityRef.scala | 5 ++- .../typed/testkit/scaladsl/EntityRef.scala | 5 ++- .../main/paradox/typed/cluster-sharding.md | 11 ++++++- .../paradox/typed/interaction-patterns.md | 10 ++++++ 9 files changed, 106 insertions(+), 15 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/main/mima-filters/2.6.12.backwards.excludes/pr-29960-entityref-introspection.excludes diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.12.backwards.excludes/pr-29960-entityref-introspection.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.12.backwards.excludes/pr-29960-entityref-introspection.excludes new file mode 100644 index 0000000000..4aadc522b7 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.12.backwards.excludes/pr-29960-entityref-introspection.excludes @@ -0,0 +1,8 @@ +# Changes to @DoNotInherit classes + +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.entityId") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.typeKey") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.dataCenter") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.EntityRefImpl.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityTypeKey.asJava") diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 454a8db3ff..4206d415ed 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -85,6 +85,8 @@ import akka.util.JavaDurationConverters._ with scaladsl.EntityTypeKey[T] { override def toString: String = s"EntityTypeKey[$messageClassName]($name)" + + private[akka] def asJava: javadsl.EntityTypeKey[T] = this } /** INTERNAL API */ @@ -251,12 +253,13 @@ import akka.util.JavaDurationConverters._ entityId: String, dataCenter: DataCenter): scaladsl.EntityRef[M] = { if (dataCenter == cluster.selfMember.dataCenter) - entityRefFor(typeKey, entityId) + entityRefFor(typeKey, entityId).asInstanceOf[EntityRefImpl[M]].withDataCenter(Some(dataCenter)) else new EntityRefImpl[M]( classicSharding.shardRegionProxy(typeKey.name, dataCenter), entityId, - typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) + typeKey.asInstanceOf[EntityTypeKeyImpl[M]], + Some(dataCenter)) } override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = { @@ -271,12 +274,13 @@ import akka.util.JavaDurationConverters._ entityId: String, dataCenter: String): javadsl.EntityRef[M] = { if (dataCenter == cluster.selfMember.dataCenter) - entityRefFor(typeKey, entityId) + entityRefFor(typeKey, entityId).asInstanceOf[EntityRefImpl[M]].withDataCenter(Some(dataCenter)) else new EntityRefImpl[M]( classicSharding.shardRegionProxy(typeKey.name, dataCenter), entityId, - typeKey.asInstanceOf[EntityTypeKeyImpl[M]]) + typeKey.asInstanceOf[EntityTypeKeyImpl[M]], + Some(dataCenter)) } override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { @@ -306,12 +310,26 @@ import akka.util.JavaDurationConverters._ */ @InternalApi private[akka] final class EntityRefImpl[M]( shardRegion: akka.actor.ActorRef, - entityId: String, - typeKey: EntityTypeKeyImpl[M]) + override val entityId: String, + override val typeKey: EntityTypeKeyImpl[M], + override val dataCenter: Option[String] = None) extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] with InternalRecipientRef[M] { + override def hashCode(): Int = + // 3 and 5 chosen as primes which are +/- 1 from a power-of-two + ((entityId.hashCode * 3) + typeKey.hashCode) * 5 + dataCenter.hashCode + + override def equals(other: Any): Boolean = + other match { + case eri: EntityRefImpl[_] => + (eri.entityId == entityId) && + (eri.typeKey == typeKey) && + (eri.dataCenter == dataCenter) + case _ => false + } + override val refPrefix = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8) override def tell(msg: M): Unit = @@ -399,6 +417,9 @@ import akka.util.JavaDurationConverters._ * INTERNAL API */ override private[akka] def asJava: javadsl.EntityRef[M] = this + + private[internal] def withDataCenter(dataCenter: Option[String]): EntityRefImpl[M] = + new EntityRefImpl[M](shardRegion, entityId, typeKey, dataCenter) } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala index 36a5db1329..85db071f87 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala @@ -25,12 +25,18 @@ import akka.util.Timeout /** * INTERNAL API */ -@InternalApi private[akka] final class TestEntityRefImpl[M](entityId: String, probe: ActorRef[M]) +@InternalApi private[akka] final class TestEntityRefImpl[M]( + override val entityId: String, + probe: ActorRef[M], + override val typeKey: scaladsl.EntityTypeKey[M]) extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] with InternalRecipientRef[M] { + import akka.actor.typed.scaladsl.adapter._ + override def dataCenter: Option[String] = None + override def tell(msg: M): Unit = probe ! msg diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 8d63ca684b..866d6e7fed 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -402,7 +402,7 @@ object StartEntity { * * Not for user extension. */ -@DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T] => +@DoNotInherit abstract class EntityTypeKey[-T] { scaladslSelf: scaladsl.EntityTypeKey[T] => /** * Name of the entity type. @@ -442,6 +442,23 @@ object EntityTypeKey { @DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] { scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] => + /** + * The identifier for the particular entity referenced by this EntityRef. + */ + def getEntityId: String = entityId + + /** + * The name of the EntityTypeKey associated with this EntityRef + */ + def getTypeKey: javadsl.EntityTypeKey[M] = typeKey.asJava + + /** + * The specified datacenter of the incarnation of the particular entity referenced by this EntityRef, + * if a datacenter was specified. + */ + def getDataCenter: Optional[String] = + Optional.ofNullable(dataCenter.orNull) + /** * Send a message to the entity referenced by this EntityRef using *at-most-once* * messaging semantics. diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 70a4db3098..e6e3a32eda 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -391,6 +391,7 @@ object StartEntity { */ def name: String + private[akka] def asJava: javadsl.EntityTypeKey[T] } object EntityTypeKey { @@ -417,6 +418,27 @@ object EntityTypeKey { */ @DoNotInherit trait EntityRef[-M] extends RecipientRef[M] { this: InternalRecipientRef[M] => + /** + * The identifier for the particular entity referenced by this EntityRef. + * + * {{{ + * // given sharding, typeKey + * sharding.entityRefFor(typeKey, "someId").entityId == "someId" // always true + * }}} + */ + def entityId: String + + /** + * The EntityTypeKey associated with this EntityRef. + */ + def typeKey: EntityTypeKey[M] + + /** + * The specified datacenter of the incarnation of the particular entity referenced by this EntityRef, + * if a datacenter was specified. + */ + def dataCenter: Option[String] + /** * Send a message to the entity referenced by this EntityRef using *at-most-once* * messaging semantics. diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/javadsl/EntityRef.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/javadsl/EntityRef.scala index 836c8a6ea4..343f60916b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/javadsl/EntityRef.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/javadsl/EntityRef.scala @@ -8,13 +8,12 @@ import akka.actor.typed.ActorRef import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl import akka.cluster.sharding.typed.javadsl.EntityRef import akka.cluster.sharding.typed.javadsl.EntityTypeKey -import akka.util.unused /** * For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]]. * It forwards all messages to the `probe`. */ object TestEntityRef { - def of[M](@unused typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = - new TestEntityRefImpl[M](entityId, probe) + def of[M](typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = + new TestEntityRefImpl[M](entityId, probe, typeKey.asScala) } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/scaladsl/EntityRef.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/scaladsl/EntityRef.scala index bbfe4013cf..86e880d850 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/scaladsl/EntityRef.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/testkit/scaladsl/EntityRef.scala @@ -8,13 +8,12 @@ import akka.actor.typed.ActorRef import akka.cluster.sharding.typed.internal.testkit.TestEntityRefImpl import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.cluster.sharding.typed.scaladsl.EntityTypeKey -import akka.util.unused /** * For testing purposes this `EntityRef` can be used in place of a real [[EntityRef]]. * It forwards all messages to the `probe`. */ object TestEntityRef { - def apply[M](@unused typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = - new TestEntityRefImpl[M](entityId, probe) + def apply[M](typeKey: EntityTypeKey[M], entityId: String, probe: ActorRef[M]): EntityRef[M] = + new TestEntityRefImpl[M](entityId, probe, typeKey) } diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 108f7d830b..85122ea25e 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -84,7 +84,7 @@ Scala Java : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #init } -Messages to a specific entity are then sent via an `EntityRef`. +Messages to a specific entity are then sent via an `EntityRef`. The `entityId` and the name of the Entity's key can be retrieved from the `EntityRef`. It is also possible to wrap methods in a `ShardingEnvelope` or define extractor functions and send messages directly to the shard region. Scala @@ -107,7 +107,16 @@ Scala Java : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #roles } +### A note about EntityRef and serialization +If including `EntityRef`s in messages or the `State`/`Event`s of an `EventSourcedBehavior`, those `EntityRef`s will need to be serialized. +The @scala[`entityId`, `typeKey`, and (in multi-DC use-cases) `dataCenter` of an `EntityRef`]@java[`getEntityId`, `getTypeKey`, and (in multi-DC use-cases) `getDataCenter` methods of an `EntityRef`] +provide exactly the information needed upon deserialization to regenerate an `EntityRef` equivalent to the one serialized, given an expected +type of messages to send to the entity. + +At this time, serialization of `EntityRef`s requires a @ref:[custom serializer](../serialization.md#customization), as the specific +`EntityTypeKey` (including the type of message which the desired entity type accepts) should not simply be encoded in the serialized +representation but looked up on the deserializing side. ## Persistence example diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index 07fff0353c..88a3f7e199 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -546,3 +546,13 @@ Java A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey` could be included in the message if it is not known statically. + +As an "alternative to the alternative", an @apidoc[typed.*.EntityRef] can be included in the messages. The `EntityRef` transparently wraps messages in a `ShardingEnvelope` and +sends them via sharding. If the target sharded entity has been passivated, it will be delivered to a new incarnation of that entity; if the target sharded entity +has been moved to a different cluster node, it will be routed to that new node. If using this approach, be aware that at this time, @ref:[a custom serializer is required](cluster-sharding.md#a-note-about-entityref-and-serialization). + +As with directly including the `entityId` and `EntityTypeKey` in the message, `EntityRef`s do not support message adaptation: the response has to be in the protocol +of the entity being responded to. + +In some cases, it may be useful to define messages with a @apidoc[akka.actor.typed.RecipientRef] which is a common supertype of `ActorRef` and `EntityRef`. At this time, +serializing a `RecipientRef` requires a custom serializer.