Active active sharding (#29364)

This commit is contained in:
Johan Andrén 2020-07-21 12:13:08 +02:00 committed by Christopher Batey
parent f531d1e57d
commit 9830988566
9 changed files with 754 additions and 5 deletions

View file

@ -22,7 +22,10 @@ import scala.collection.JavaConverters._
* Akka Cluster. * Akka Cluster.
* *
* This actor should be started once on each node where Active Active entities will run (the same nodes that you start * This actor should be started once on each node where Active Active entities will run (the same nodes that you start
* sharding on). * sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* (FIXME not supported in Java yet)
* If using [[ActiveActiveSharding]] the replication can be enabled through [[ActiveActiveShardingSettings#withDirectReplication()]]
* instead of starting this actor manually.
* *
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas * Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas
* which can then fast forward their cross-replica event streams to improve latency while allowing less frequent poll * which can then fast forward their cross-replica event streams to improve latency while allowing less frequent poll
@ -49,6 +52,14 @@ object ActiveActiveShardingDirectReplication {
private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command private final case class WrappedPublishedEvent(publishedEvent: PublishedEvent) extends Command
/**
* Java API:
* Factory for when the self replica id is unknown (or multiple)
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def create[T](replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(None, replicaShardingProxies.asScala.toMap)
/** /**
* Java API: * Java API:
* @param selfReplica The replica id of the replica that runs on this node * @param selfReplica The replica id of the replica that runs on this node
@ -57,7 +68,14 @@ object ActiveActiveShardingDirectReplication {
def create[T]( def create[T](
selfReplica: ReplicaId, selfReplica: ReplicaId,
replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] = replicaShardingProxies: java.util.Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(selfReplica, replicaShardingProxies.asScala.toMap) apply(Some(selfReplica), replicaShardingProxies.asScala.toMap)
/**
* Scala API:
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(None, replicaShardingProxies)
/** /**
* Scala API: * Scala API:
@ -65,6 +83,11 @@ object ActiveActiveShardingDirectReplication {
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/ */
def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] = def apply[T](selfReplica: ReplicaId, replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
apply(Some(selfReplica), replicaShardingProxies)
private def apply[T](
selfReplica: Option[ReplicaId],
replicaShardingProxies: Map[ReplicaId, ActorRef[T]]): Behavior[Command] =
Behaviors.setup[Command] { context => Behaviors.setup[Command] { context =>
context.log.debug( context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas", "Subscribing to event stream to forward events to [{}] sharded replicas",
@ -81,7 +104,7 @@ object ActiveActiveShardingDirectReplication {
replicaShardingProxies.foreach { replicaShardingProxies.foreach {
case (replica, proxy) => case (replica, proxy) =>
val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event) val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event)
if (replica != selfReplica) if (!selfReplica.contains(replica))
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
} }
Behaviors.same Behaviors.same

View file

@ -0,0 +1,68 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.cluster.sharding.typed.internal.ActiveActiveShardingExtensionImpl
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.persistence.typed.ReplicaId
import java.util.{ Map => JMap }
/**
* Extension for running active active in sharding by starting one separate instance of sharding per replica.
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
*/
@ApiMayChange
object ActiveActiveShardingExtension extends ExtensionId[ActiveActiveShardingExtension] {
override def createExtension(system: ActorSystem[_]): ActiveActiveShardingExtension =
new ActiveActiveShardingExtensionImpl(system)
def get(system: ActorSystem[_]): ActiveActiveShardingExtension = apply(system)
}
/**
* Not for user extension.
*/
@DoNotInherit
@ApiMayChange
trait ActiveActiveShardingExtension extends Extension {
/**
* Init one instance sharding per replica in the given settings and return a [[ActiveActiveSharding]] representing those.
*
* @tparam M The type of messages the active active event sourced actor accepts
* @tparam E The type of envelope used for routing messages to actors, the same for all replicas
*
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ActiveActiveSharding]]
*/
def init[M, E](settings: ActiveActiveShardingSettings[M, E]): ActiveActiveSharding[M]
}
/**
* Represents the sharding instances for the replicas of one active active entity type
*
* Not for user extension.
*/
@DoNotInherit
@ApiMayChange
trait ActiveActiveSharding[M] {
/**
* Scala API: Returns the entity ref for each replica for user defined routing/replica selection
*/
def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]]
/**
* Java API: Returns the entity ref for each replica for user defined routing/replica selection
*/
def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]]
}

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.javadsl.{ Entity => JEntity, EntityTypeKey => JEntityTypeKey }
import akka.persistence.typed.ReplicaId
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.collection.JavaConverters._
import java.util.{ Set => JSet }
import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
@ApiMayChange
object ActiveActiveShardingSettings {
/**
* Java API:
*
* @tparam M The type of messages the active active entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def create[M, E](
messageClass: Class[M],
allReplicaIds: JSet[ReplicaId],
settingsPerReplicaFactory: akka.japi.function.Function3[
JEntityTypeKey[M],
ReplicaId,
JSet[ReplicaId],
ReplicaSettings[M, E]]): ActiveActiveShardingSettings[M, E] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) =>
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds))
}
/**
* Scala API:
*
* @tparam M The type of messages the active active entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicaSettings[M, E])
: ActiveActiveShardingSettings[M, E] = {
new ActiveActiveShardingSettings(allReplicaIds.map { replicaId =>
val typeKey = EntityTypeKey[M](replicaId.id)
settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds)
}.toVector, directReplication = false)
}
}
/**
* @tparam M The type of messages the active active entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
@ApiMayChange
final class ActiveActiveShardingSettings[M, E] private (
val replicas: immutable.Seq[ReplicaSettings[M, E]],
val directReplication: Boolean) {
/**
* Start direct replication over sharding when active active sharding starts up, requires the entities
* to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* to work.
*
* FIXME no support for enabling that in Java because bin/source comp.
*/
def withDirectReplication(): ActiveActiveShardingSettings[M, E] =
new ActiveActiveShardingSettings(replicas, directReplication = true)
}
@ApiMayChange
object ReplicaSettings {
/**
* Java API: Defines the [[akka.cluster.sharding.typed.javadsl.Entity]] to use for a given replica, note that the behavior
* can be a [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior]] or an arbitrary non persistent
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicaSettings[M, E] =
apply(replicaId, entity.toScala)
/**
* Scala API: Defines the [[akka.cluster.sharding.typed.scaladsl.Entity]] to use for a given replica, note that the behavior
* can be a behavior created with [[akka.persistence.typed.scaladsl.ActiveActiveEventSourcing]] or an arbitrary non persistent
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicaSettings[M, E] =
new ReplicaSettings(replicaId, entity)
}
/**
* Settings for a specific replica id in active active sharding
*/
@ApiMayChange
final class ReplicaSettings[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E])

View file

@ -0,0 +1,82 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.internal
import java.util.concurrent.atomic.AtomicLong
import java.util.{ Map => JMap }
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.cluster.sharding.typed.ActiveActiveShardingExtension
import akka.cluster.sharding.typed.ActiveActiveSharding
import akka.cluster.sharding.typed.ActiveActiveShardingSettings
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import org.slf4j.LoggerFactory
import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.sharding.typed.ActiveActiveShardingDirectReplication
import scala.collection.JavaConverters._
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class ActiveActiveShardingExtensionImpl(system: ActorSystem[_])
extends ActiveActiveShardingExtension {
private val counter = new AtomicLong(0)
private val logger = LoggerFactory.getLogger(getClass)
override def init[M, E](settings: ActiveActiveShardingSettings[M, E]): ActiveActiveSharding[M] = {
val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map { replicaSettings =>
// start up a sharding instance per replica id
logger.infoN(
"Starting Active Active sharding for replica [{}] (ShardType: [{}])",
replicaSettings.replicaId.id,
replicaSettings.entity.typeKey.name)
val regionOrProxy = sharding.init(replicaSettings.entity)
(replicaSettings.replicaId, replicaSettings.entity.typeKey, regionOrProxy)
}
if (settings.directReplication) {
logger.infoN("Starting Active Active Direct Replication")
val replicaToRegionOrProxy = initializedReplicas.map {
case (id, _, regionOrProxy) => id -> regionOrProxy
}.toMap
system.systemActorOf(
ActiveActiveShardingDirectReplication(replicaToRegionOrProxy),
s"activeActiveDirectReplication-${counter.incrementAndGet()}")
}
val replicaToTypeKey = initializedReplicas.map { case (id, typeKey, _) => id -> typeKey }.toMap
new ActiveActiveShardingImpl(sharding, replicaToTypeKey)
}
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class ActiveActiveShardingImpl[M](
sharding: ClusterSharding,
replicaTypeKeys: Map[ReplicaId, EntityTypeKey[M]])
extends ActiveActiveSharding[M] {
override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] =
replicaTypeKeys.map {
case (replicaId, typeKey) =>
replicaId -> sharding.entityRefFor(typeKey, PersistenceId.ofUniqueId(entityId).id)
}
override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] =
entityRefsFor(entityId).asJava
}

View file

@ -22,7 +22,7 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.japi.function.{ Function => JFunction } import akka.japi.function.{ Function => JFunction }
import akka.pattern.StatusReply import akka.pattern.StatusReply
import scala.compat.java8.OptionConverters._
@FunctionalInterface @FunctionalInterface
trait EntityFactory[M] { trait EntityFactory[M] {
def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M]
@ -337,6 +337,22 @@ final class Entity[M, E] private (
dataCenter) dataCenter)
} }
/**
* INTERNAL API
*/
@InternalApi
private[akka] def toScala: akka.cluster.sharding.typed.scaladsl.Entity[M, E] =
new akka.cluster.sharding.typed.scaladsl.Entity(
eCtx => createBehavior(eCtx.toJava),
typeKey.asScala,
stopMessage.asScala,
entityProps,
settings.asScala,
messageExtractor.asScala,
allocationStrategy.asScala,
role.asScala,
dataCenter.asScala)
} }
/** /**

View file

@ -348,7 +348,18 @@ final class Entity[M, E] private[akka] (
final class EntityContext[M]( final class EntityContext[M](
val entityTypeKey: EntityTypeKey[M], val entityTypeKey: EntityTypeKey[M],
val entityId: String, val entityId: String,
val shard: ActorRef[ClusterSharding.ShardCommand]) val shard: ActorRef[ClusterSharding.ShardCommand]) {
/**
* INTERNAL API
*/
@InternalApi
private[akka] def toJava: akka.cluster.sharding.typed.javadsl.EntityContext[M] =
new akka.cluster.sharding.typed.javadsl.EntityContext[M](
entityTypeKey.asInstanceOf[EntityTypeKeyImpl[M]],
entityId,
shard)
}
/** Allows starting a specific Sharded Entity by its entity identifier */ /** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity { object StartEntity {

View file

@ -0,0 +1,255 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.MemberStatus;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.scaladsl.EntityRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import scala.util.Random;
import java.util.*;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
public class ActiveActiveShardingTest extends JUnitSuite {
static class MyActiveActiveStringSet
extends ActiveActiveEventSourcedBehavior<
MyActiveActiveStringSet.Command, String, Set<String>> {
interface Command {}
static class Add implements Command {
public final String text;
public Add(String text) {
this.text = text;
}
}
static class GetTexts implements Command {
public final ActorRef<Texts> replyTo;
public GetTexts(ActorRef<Texts> replyTo) {
this.replyTo = replyTo;
}
}
static class Texts {
public final Set<String> texts;
public Texts(Set<String> texts) {
this.texts = texts;
}
}
static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ActiveActiveEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
MyActiveActiveStringSet::new);
}
private MyActiveActiveStringSet(ActiveActiveContext activeActiveContext) {
super(activeActiveContext);
}
@Override
public Set<String> emptyState() {
return new HashSet<>();
}
@Override
public CommandHandler<Command, String, Set<String>> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(Add.class, add -> Effect().persist(add.text))
.onCommand(
GetTexts.class,
(state, get) -> {
// protective copy
get.replyTo.tell(new Texts(new HashSet<>(state)));
return Effect().none();
})
.build();
}
@Override
public EventHandler<Set<String>, String> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onAnyEvent(
(state, text) -> {
state.add(text);
return state;
});
}
}
public static class ProxyActor extends AbstractBehavior<ProxyActor.Command> {
interface Command {}
public static final class ForwardToRandom implements Command {
public final String entityId;
public final MyActiveActiveStringSet.Command message;
public ForwardToRandom(String entityId, MyActiveActiveStringSet.Command message) {
this.entityId = entityId;
this.message = message;
}
}
public static final class ForwardToAll implements Command {
public final String entityId;
public final MyActiveActiveStringSet.Command message;
public ForwardToAll(String entityId, MyActiveActiveStringSet.Command message) {
this.entityId = entityId;
this.message = message;
}
}
public static Behavior<Command> create() {
return Behaviors.setup(ProxyActor::new);
}
public static final Set<ReplicaId> ALL_REPLICAS =
Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C"))));
private final ActiveActiveSharding<MyActiveActiveStringSet.Command> aaSharding;
private ProxyActor(ActorContext<Command> context) {
super(context);
// #bootstrap
ActiveActiveShardingSettings<
MyActiveActiveStringSet.Command, ShardingEnvelope<MyActiveActiveStringSet.Command>>
aaShardingSettings =
ActiveActiveShardingSettings.create(
MyActiveActiveStringSet.Command.class,
ALL_REPLICAS,
// factory for replica settings for a given replica
(entityTypeKey, replicaId, allReplicas) ->
ReplicaSettings.create(
replicaId,
// use the replica id as typekey for sharding to get one sharding instance
// per replica
Entity.of(
entityTypeKey,
entityContext ->
// factory for the entity for a given entity in that replica
MyActiveActiveStringSet.create(
entityContext.getEntityId(), replicaId, allReplicas))
// potentially use replica id as role or dc in Akka multi dc for the
// sharding instance
// to control where replicas will live
// .withDataCenter(replicaId.id()))
.withRole(replicaId.id())));
ActiveActiveShardingExtension extension =
ActiveActiveShardingExtension.get(getContext().getSystem());
aaSharding = extension.init(aaShardingSettings);
// #bootstrap
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(ForwardToRandom.class, this::onForwardToRandom)
.onMessage(ForwardToAll.class, this::onForwardToAll)
.build();
}
private Behavior<Command> onForwardToRandom(ForwardToRandom forwardToRandom) {
Map<ReplicaId, EntityRef<MyActiveActiveStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToRandom.entityId);
int chosenIdx = new java.util.Random().nextInt(refs.size());
new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message);
return this;
}
private Behavior<Command> onForwardToAll(ForwardToAll forwardToAll) {
// #all-entity-refs
Map<ReplicaId, EntityRef<MyActiveActiveStringSet.Command>> refs =
aaSharding.getEntityRefsFor(forwardToAll.entityId);
refs.forEach((replicaId, ref) -> ref.tell(forwardToAll.message));
// #all-entity-refs
return this;
}
}
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(
ConfigFactory.parseString(
" akka.loglevel = DEBUG\n"
+ " akka.loggers = [\"akka.testkit.SilenceAllTestEventListener\"]\n"
+ " akka.actor.provider = \"cluster\"\n"
+ " # pretend we're a node in all dc:s\n"
+ " akka.cluster.roles = [\"DC-A\", \"DC-B\", \"DC-C\"]\n"
+ " akka.remote.classic.netty.tcp.port = 0\n"
+ " akka.remote.artery.canonical.port = 0")
.withFallback(PersistenceTestKitPlugin.getInstance().config()));
@Rule public final LogCapturing logCapturing = new LogCapturing();
@Test
public void formClusterAndInteractWithReplicas() {
// join ourselves to form a one node cluster
Cluster node = Cluster.get(testKit.system());
node.manager().tell(new Join(node.selfMember().address()));
TestProbe<Object> testProbe = testKit.createTestProbe();
testProbe.awaitAssert(
() -> {
assertEquals(MemberStatus.up(), node.selfMember().status());
return null;
});
// forward messages to replicas
ActorRef<ProxyActor.Command> proxy = testKit.spawn(ProxyActor.create());
proxy.tell(new ProxyActor.ForwardToAll("id1", new MyActiveActiveStringSet.Add("to-all")));
proxy.tell(new ProxyActor.ForwardToRandom("id1", new MyActiveActiveStringSet.Add("to-random")));
testProbe.awaitAssert(
() -> {
TestProbe<MyActiveActiveStringSet.Texts> responseProbe = testKit.createTestProbe();
proxy.tell(
new ProxyActor.ForwardToAll(
"id1", new MyActiveActiveStringSet.GetTexts(responseProbe.ref())));
List<MyActiveActiveStringSet.Texts> responses = responseProbe.receiveSeveralMessages(3);
Set<String> uniqueTexts =
responses.stream().flatMap(res -> res.texts.stream()).collect(Collectors.toSet());
assertEquals(new HashSet<>(Arrays.asList("to-all", "to-random")), uniqueTexts);
return null;
});
}
}

View file

@ -0,0 +1,146 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.util.Random
object ActiveActiveShardingSpec {
def config = ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = "cluster"
# pretend we're a node in all dc:s
akka.cluster.roles = ["DC-A", "DC-B", "DC-C"]
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0""").withFallback(PersistenceTestKitPlugin.config)
}
class ActiveActiveShardingSpec
extends ScalaTestWithActorTestKit(ActiveActiveShardingSpec.config)
with AnyWordSpecLike
with LogCapturing {
object MyActiveActiveStringSet {
trait Command extends CborSerializable
case class Add(text: String) extends Command
case class GetTexts(replyTo: ActorRef[Texts]) extends Command
case class Texts(texts: Set[String]) extends CborSerializable
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
ActiveActiveEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier) { aaContext =>
EventSourcedBehavior[Command, String, Set[String]](
aaContext.persistenceId,
Set.empty[String],
(state, command) =>
command match {
case Add(text) =>
Effect.persist(text)
case GetTexts(replyTo) =>
replyTo ! Texts(state)
Effect.none
},
(state, event) => state + event).withJournalPluginId(PersistenceTestKitPlugin.PluginId)
}
}
object ProxyActor {
sealed trait Command
case class ForwardToRandom(entityId: String, msg: MyActiveActiveStringSet.Command) extends Command
case class ForwardToAll(entityId: String, msg: MyActiveActiveStringSet.Command) extends Command
def apply(): Behavior[Command] = Behaviors.setup { context =>
// #bootstrap
val aaShardingSettings =
ActiveActiveShardingSettings[
MyActiveActiveStringSet.Command,
ShardingEnvelope[MyActiveActiveStringSet.Command]](
// all replicas
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
// factory for replica settings for a given replica
ReplicaSettings(
replicaId,
// use the provided entity type key for sharding to get one sharding instance per replica
Entity(entityTypeKey) { entityContext =>
// factory for the entity for a given entity in that replica
MyActiveActiveStringSet(entityContext.entityId, replicaId, allReplicaIds)
}
// potentially use replica id as role or dc in Akka multi dc for the sharding instance
// to control where replicas will live
// .withDataCenter(replicaId.id))
.withRole(replicaId.id))
}
val aaSharding = ActiveActiveShardingExtension(context.system).init(aaShardingSettings)
// #bootstrap
Behaviors.receiveMessage {
case ForwardToAll(entityId, cmd) =>
// #all-entity-refs
aaSharding.entityRefsFor(entityId).foreach {
case (_, ref) => ref ! cmd
}
// #all-entity-refs
Behaviors.same
case ForwardToRandom(entityId, cmd) =>
val refs = aaSharding.entityRefsFor(entityId)
val chosenIdx = (new Random()).nextInt(refs.size)
refs.values.toIndexedSeq(chosenIdx) ! cmd;
Behaviors.same
}
}
}
"Active active sharding" should {
"form a one node cluster" in {
val node = Cluster(system)
node.manager ! Join(node.selfMember.address)
eventually {
node.selfMember.status should ===(MemberStatus.Up)
}
}
"forward to replicas" in {
val proxy = spawn(ProxyActor())
proxy ! ProxyActor.ForwardToAll("id1", MyActiveActiveStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandom("id1", MyActiveActiveStringSet.Add("to-random"))
eventually {
val probe = createTestProbe[MyActiveActiveStringSet.Texts]()
proxy ! ProxyActor.ForwardToAll("id1", MyActiveActiveStringSet.GetTexts(probe.ref))
val responses: Seq[MyActiveActiveStringSet.Texts] = probe.receiveMessages(3)
val uniqueTexts = responses.flatMap(res => res.texts).toSet
uniqueTexts should ===(Set("to-all", "to-random"))
}
}
}
}

View file

@ -241,3 +241,46 @@ When comparing two version vectors `v1` and `v2`:
* `v1`is BEFORE `v2` iff for all i v1(i) <= v2(i) and there exist a j such that v1(j) < v2(j) * `v1`is BEFORE `v2` iff for all i v1(i) <= v2(i) and there exist a j such that v1(j) < v2(j)
* `v1`is AFTER `v2` iff for all i v1(i) >= v2(i) and there exist a j such that v1(j) > v2(j) * `v1`is AFTER `v2` iff for all i v1(i) >= v2(i) and there exist a j such that v1(j) > v2(j)
* `v1`is CONCURRENT with `v2` otherwise * `v1`is CONCURRENT with `v2` otherwise
## Sharded Active Active entities
To simplify what probably are the most common use cases for how you will want to distribute the active active actors there is a minimal API for running multiple instances of @ref[Akka Cluster Sharding](cluster-sharding.md),
each instance holding the entities for a single replica.
The distribution of the replicas can be controlled either through cluster roles or using the @ref[multi datacenter](cluster-dc.md) support in Akka Cluster.
The API consists of bootstrapping logic for starting the sharding instances through @apidoc[ActiveActiveShardingExtension] available from the
`akka-cluster-sharding-typed` module.
Scala
: @@snip [ActiveActiveShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingSpec.scala) { #bootstrap }
Java
: @@snip [ActiveActiveShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ActiveActiveShardingTest.java) { #bootstrap }
`init` returns an @apidoc[ActiveActiveSharding] instance which gives access to @apidoc[EntityRef]s for each of the replicas for arbitrary routing logic:
Scala
: @@snip [ActiveActiveShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingSpec.scala) { #all-entity-refs }
Java
: @@snip [ActiveActiveShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ActiveActiveShardingTest.java) { #all-entity-refs }
More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)).
## Direct Replication of Events
Normally an event has to be written in the journal and then picked up by the trailing read journal in the other replicas.
As an optimization the active active events can be published across the Akka cluster to the replicas. The read side
query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most
events will arrive at the replicas through the cluster.
To enable this feature you first need to enable event publishing on the `EventSourcedBehavior` with `withEventPublishing`
(FIXME missing Java API) and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using
active active sharding the replication can be run standalone by starting the @apidoc[ActiveActiveShardingDirectReplication] actor).
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,
the `ActiveActiveShardingDirectReplication` actor subscribes to these events and forwards them to the replicas allowing them
to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)).