feat: Java API for ExternalShardAllocationStrategy (#1246)

support Java API in ExternalShardAllocationStrategy.
This commit is contained in:
AndyChen(Jingzhang) 2024-04-02 09:40:59 +08:00 committed by GitHub
parent 276a757d77
commit a11d4034aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 33 additions and 3 deletions

View file

@ -18,12 +18,14 @@ import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.cluster.sharding.external.ExternalShardAllocation;
import org.apache.pekko.cluster.sharding.external.ExternalShardAllocationStrategy;
import org.apache.pekko.cluster.sharding.external.javadsl.ExternalShardAllocationClient;
import org.apache.pekko.cluster.sharding.typed.ShardingEnvelope;
import org.apache.pekko.cluster.sharding.typed.javadsl.ClusterSharding;
import org.apache.pekko.cluster.sharding.typed.javadsl.Entity;
import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import static jdocs.org.apache.pekko.cluster.sharding.typed.ShardingCompileOnlyTest.Counter;
@ -39,7 +41,11 @@ public class ExternalShardAllocationCompileOnlyTest {
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
ActorRef<ShardingEnvelope<Counter.Command>> shardRegion =
sharding.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())));
sharding.init(
Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId()))
.withAllocationStrategy(
ExternalShardAllocationStrategy.create(
system, typeKey.name(), Duration.ofSeconds(5))));
// #entity
// #client

View file

@ -38,7 +38,7 @@ class ExternalShardAllocationCompileOnlySpec {
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
val entity = Entity(TypeKey)(createBehavior = entityContext => Counter(entityContext.entityId))
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
.withAllocationStrategy(ExternalShardAllocationStrategy(system, TypeKey.name))
// #entity
val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] =

View file

@ -39,11 +39,35 @@ import pekko.cluster.sharding.ShardRegion.ShardId
import pekko.event.Logging
import pekko.pattern.AskTimeoutException
import pekko.util.Timeout
import pekko.util.JavaDurationConverters._
object ExternalShardAllocationStrategy {
type ShardRegion = ActorRef
/**
* Create an [[ExternalShardAllocationStrategy]]
*/
def apply(systemProvider: ClassicActorSystemProvider, typeName: String): ExternalShardAllocationStrategy = {
implicit val timeout: Timeout = 5.seconds
new ExternalShardAllocationStrategy(systemProvider, typeName)
}
/**
* Scala API: Create an [[ExternalShardAllocationStrategy]]
*/
def apply(systemProvider: ClassicActorSystemProvider, typeName: String, timeout: FiniteDuration)
: ExternalShardAllocationStrategy = {
new ExternalShardAllocationStrategy(systemProvider, typeName)(timeout)
}
/**
* Java API: Create an [[ExternalShardAllocationStrategy]]
*/
def create(systemProvider: ClassicActorSystemProvider, typeName: String, timeout: java.time.Duration)
: ExternalShardAllocationStrategy =
this.apply(systemProvider, typeName, timeout.asScala)
// local only messages
private[pekko] final case class GetShardLocation(shard: ShardId)
private[pekko] case object GetShardLocations

View file

@ -105,7 +105,7 @@ abstract class ExternalShardAllocationSpec
entityProps = Props[GiveMeYourHome](),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy = new ExternalShardAllocationStrategy(system, typeName))
allocationStrategy = ExternalShardAllocationStrategy(system, typeName))
"start cluster sharding" in {
shardRegion