diff --git a/.gitignore b/.gitignore index 2d8c08866b..750f097ba6 100755 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,8 @@ .manager .multi-jvm .project +.sbtserver +.sbtserver.lock .scala_dependencies .scalastyle .settings diff --git a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedServiceRegistrySpec.scala b/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedServiceRegistrySpec.scala deleted file mode 100644 index ed5a9344ad..0000000000 --- a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedServiceRegistrySpec.scala +++ /dev/null @@ -1,267 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package sample.datareplication - -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props -import akka.actor.Terminated -import akka.cluster.Cluster -import akka.cluster.ClusterEvent -import akka.cluster.ClusterEvent.LeaderChanged -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.GSet -import akka.cluster.ddata.ORSet -import akka.cluster.ddata.Replicator.GetReplicaCount -import akka.cluster.ddata.Replicator.ReplicaCount -import akka.cluster.ddata.STMultiNodeSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory -import akka.cluster.ddata.GSetKey -import akka.cluster.ddata.ORSetKey -import akka.cluster.ddata.Key - -object ReplicatedServiceRegistrySpec extends MultiNodeConfig { - val node1 = role("node-1") - val node2 = role("node-2") - val node3 = role("node-3") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.log-dead-letters-during-shutdown = off - """)) - - class Service extends Actor { - def receive = { - case s: String ⇒ sender() ! self.path.name + ": " + s - } - } - -} - -object ReplicatedServiceRegistry { - import akka.cluster.ddata.Replicator._ - - val props: Props = Props[ReplicatedServiceRegistry] - - /** - * Register a `service` with a `name`. Several services - * can be registered with the same `name`. - * It will be removed when it is terminated. - */ - final case class Register(name: String, service: ActorRef) - /** - * Lookup services registered for a `name`. [[Bindings]] will - * be sent to `sender()`. - */ - final case class Lookup(name: String) - /** - * Reply for [[Lookup]] - */ - final case class Bindings(name: String, services: Set[ActorRef]) - /** - * Published to `System.eventStream` when services are changed. - */ - final case class BindingChanged(name: String, services: Set[ActorRef]) - - final case class ServiceKey(serviceName: String) extends Key[ORSet[ActorRef]](serviceName) - - private val AllServicesKey = GSetKey[ServiceKey]("service-keys") - -} - -class ReplicatedServiceRegistry() extends Actor with ActorLogging { - import akka.cluster.ddata.Replicator._ - import ReplicatedServiceRegistry._ - - val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) - - var keys = Set.empty[ServiceKey] - var services = Map.empty[String, Set[ActorRef]] - var leader = false - - def serviceKey(serviceName: String): ServiceKey = - ServiceKey("service:" + serviceName) - - override def preStart(): Unit = { - replicator ! Subscribe(AllServicesKey, self) - cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.LeaderChanged]) - } - - override def postStop(): Unit = { - cluster.unsubscribe(self) - } - - def receive = { - case Register(name, service) ⇒ - val dKey = serviceKey(name) - // store the service names in a separate GSet to be able to - // get notifications of new names - if (!keys(dKey)) - replicator ! Update(AllServicesKey, GSet(), WriteLocal)(_ + dKey) - // add the service - replicator ! Update(dKey, ORSet(), WriteLocal)(_ + service) - - case Lookup(key) ⇒ - sender() ! Bindings(key, services.getOrElse(key, Set.empty)) - - case c @ Changed(AllServicesKey) ⇒ - val newKeys = c.get(AllServicesKey).elements - log.debug("Services changed, added: {}, all: {}", (newKeys -- keys), newKeys) - (newKeys -- keys).foreach { dKey ⇒ - // subscribe to get notifications of when services with this name are added or removed - replicator ! Subscribe(dKey, self) - } - keys = newKeys - - case c @ Changed(ServiceKey(serviceName)) ⇒ - val name = serviceName.split(":").tail.mkString - val newServices = c.get(serviceKey(name)).elements - log.debug("Services changed for name [{}]: {}", name, newServices) - services = services.updated(name, newServices) - context.system.eventStream.publish(BindingChanged(name, newServices)) - if (leader) - newServices.foreach(context.watch) // watch is idempotent - - case LeaderChanged(node) ⇒ - // Let one node (the leader) be responsible for removal of terminated services - // to avoid redundant work and too many death watch notifications. - // It is not critical to only do it from one node. - val wasLeader = leader - leader = node.exists(_ == cluster.selfAddress) - // when used with many (> 500) services you must increase the system message buffer - // `akka.remote.system-message-buffer-size` - if (!wasLeader && leader) - for (refs ← services.valuesIterator; ref ← refs) - context.watch(ref) - else if (wasLeader && !leader) - for (refs ← services.valuesIterator; ref ← refs) - context.unwatch(ref) - - case Terminated(ref) ⇒ - val names = services.collect { case (name, refs) if refs.contains(ref) ⇒ name } - names.foreach { name ⇒ - log.debug("Service with name [{}] terminated: {}", name, ref) - replicator ! Update(serviceKey(name), ORSet(), WriteLocal)(_ - ref) - } - - case _: UpdateResponse[_] ⇒ // ok - } - -} - -class ReplicatedServiceRegistrySpecMultiJvmNode1 extends ReplicatedServiceRegistrySpec -class ReplicatedServiceRegistrySpecMultiJvmNode2 extends ReplicatedServiceRegistrySpec -class ReplicatedServiceRegistrySpecMultiJvmNode3 extends ReplicatedServiceRegistrySpec - -class ReplicatedServiceRegistrySpec extends MultiNodeSpec(ReplicatedServiceRegistrySpec) with STMultiNodeSpec with ImplicitSender { - import ReplicatedServiceRegistrySpec._ - import ReplicatedServiceRegistry._ - - override def initialParticipants = roles.size - - val cluster = Cluster(system) - val registry = system.actorOf(ReplicatedServiceRegistry.props) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster join node(to).address - } - enterBarrier(from.name + "-joined") - } - - "Demo of a replicated service registry" must { - "join cluster" in within(10.seconds) { - join(node1, node1) - join(node2, node1) - join(node3, node1) - - awaitAssert { - DistributedData(system).replicator ! GetReplicaCount - expectMsg(ReplicaCount(roles.size)) - } - enterBarrier("after-1") - } - - "replicate service entry" in within(10.seconds) { - runOn(node1) { - val a1 = system.actorOf(Props[Service], name = "a1") - registry ! Register("a", a1) - } - - awaitAssert { - val probe = TestProbe() - registry.tell(Lookup("a"), probe.ref) - probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1")) - } - - enterBarrier("after-2") - } - - "replicate updated service entry, and publish to even bus" in { - val probe = TestProbe() - system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) - - runOn(node2) { - val a2 = system.actorOf(Props[Service], name = "a2") - registry ! Register("a", a2) - } - - probe.within(10.seconds) { - probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1", "a2")) - registry.tell(Lookup("a"), probe.ref) - probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1", "a2")) - } - - enterBarrier("after-4") - } - - "remove terminated service" in { - val probe = TestProbe() - system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) - - runOn(node2) { - registry.tell(Lookup("a"), probe.ref) - val a2 = probe.expectMsgType[Bindings].services.find(_.path.name == "a2").get - a2 ! PoisonPill - } - - probe.within(10.seconds) { - probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1")) - registry.tell(Lookup("a"), probe.ref) - probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1")) - } - - enterBarrier("after-5") - } - - "replicate many service entries" in within(10.seconds) { - for (i ← 100 until 200) { - val service = system.actorOf(Props[Service], name = myself.name + "_" + i) - registry ! Register("a" + i, service) - } - - awaitAssert { - val probe = TestProbe() - for (i ← 100 until 200) { - registry.tell(Lookup("a" + i), probe.ref) - probe.expectMsgType[Bindings].services.map(_.path.name) should be(roles.map(_.name + "_" + i).toSet) - } - } - - enterBarrier("after-6") - } - - } - -} - diff --git a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/VotingContestSpec.scala b/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/VotingContestSpec.scala deleted file mode 100644 index 28c5200a38..0000000000 --- a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/VotingContestSpec.scala +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package sample.datareplication - -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.cluster.Cluster -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.Flag -import akka.cluster.ddata.PNCounterMap -import akka.cluster.ddata.Replicator.GetReplicaCount -import akka.cluster.ddata.Replicator.ReplicaCount -import akka.cluster.ddata.STMultiNodeSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory -import akka.cluster.ddata.FlagKey -import akka.cluster.ddata.PNCounterMapKey - -object VotingContestSpec extends MultiNodeConfig { - val node1 = role("node-1") - val node2 = role("node-2") - val node3 = role("node-3") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.log-dead-letters-during-shutdown = off - """)) - -} - -object VotingService { - case object Open - case object OpenAck - case object Close - case object CloseAck - final case class Vote(participant: String) - case object GetVotes - final case class Votes(result: Map[String, BigInt], open: Boolean) - - private final case class GetVotesReq(replyTo: ActorRef) -} - -class VotingService extends Actor { - import akka.cluster.ddata.Replicator._ - import VotingService._ - - val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) - val OpenedKey = FlagKey("contestOpened") - val ClosedKey = FlagKey("contestClosed") - val CountersKey = PNCounterMapKey("contestCounters") - - replicator ! Subscribe(OpenedKey, self) - - def receive = { - case Open ⇒ - replicator ! Update(OpenedKey, Flag(), WriteAll(5.seconds))(_.switchOn) - becomeOpen() - - case c @ Changed(OpenedKey) if c.get(OpenedKey).enabled ⇒ - becomeOpen() - - case GetVotes ⇒ - sender() ! Votes(Map.empty, open = false) - } - - def becomeOpen(): Unit = { - replicator ! Unsubscribe(OpenedKey, self) - replicator ! Subscribe(ClosedKey, self) - context.become(open orElse getVotes(open = true)) - } - - def open: Receive = { - case v @ Vote(participant) ⇒ - val update = Update(CountersKey, PNCounterMap(), WriteLocal, request = Some(v)) { - _.increment(participant, 1) - } - replicator ! update - - case _: UpdateSuccess[_] ⇒ - - case Close ⇒ - replicator ! Update(ClosedKey, Flag(), WriteAll(5.seconds))(_.switchOn) - context.become(getVotes(open = false)) - - case c @ Changed(ClosedKey) if c.get(ClosedKey).enabled ⇒ - context.become(getVotes(open = false)) - } - - def getVotes(open: Boolean): Receive = { - case GetVotes ⇒ - replicator ! Get(CountersKey, ReadAll(3.seconds), Some(GetVotesReq(sender()))) - - case g @ GetSuccess(CountersKey, Some(GetVotesReq(replyTo))) ⇒ - val data = g.get(CountersKey) - replyTo ! Votes(data.entries, open) - - case NotFound(CountersKey, Some(GetVotesReq(replyTo))) ⇒ - replyTo ! Votes(Map.empty, open) - - case _: GetFailure[_] ⇒ - - case _: UpdateSuccess[_] ⇒ - } - -} - -class VotingContestSpecMultiJvmNode1 extends VotingContestSpec -class VotingContestSpecMultiJvmNode2 extends VotingContestSpec -class VotingContestSpecMultiJvmNode3 extends VotingContestSpec - -class VotingContestSpec extends MultiNodeSpec(VotingContestSpec) with STMultiNodeSpec with ImplicitSender { - import VotingContestSpec._ - - override def initialParticipants = roles.size - - val cluster = Cluster(system) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster join node(to).address - } - enterBarrier(from.name + "-joined") - } - - "Demo of a replicated voting" must { - - "join cluster" in within(10.seconds) { - join(node1, node1) - join(node2, node1) - join(node3, node1) - - awaitAssert { - DistributedData(system).replicator ! GetReplicaCount - expectMsg(ReplicaCount(roles.size)) - } - enterBarrier("after-1") - } - - "count votes correctly" in within(15.seconds) { - import VotingService._ - val votingService = system.actorOf(Props[VotingService], "votingService") - val N = 1000 - runOn(node1) { - votingService ! Open - for (n ← 1 to N) { - votingService ! Vote("#" + ((n % 20) + 1)) - } - } - runOn(node2, node3) { - // wait for it to open - val p = TestProbe() - awaitAssert { - votingService.tell(GetVotes, p.ref) - p.expectMsgPF(3.seconds) { case Votes(_, true) ⇒ true } - } - for (n ← 1 to N) { - votingService ! Vote("#" + ((n % 20) + 1)) - } - } - enterBarrier("voting-done") - runOn(node3) { - votingService ! Close - } - - val expected = (1 to 20).map(n ⇒ "#" + n -> BigInt(3L * N / 20)).toMap - awaitAssert { - votingService ! GetVotes - expectMsg(3.seconds, Votes(expected, false)) - } - - enterBarrier("after-2") - } - } - -} - diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/LotsOfDataBot.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala similarity index 88% rename from akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/LotsOfDataBot.scala rename to akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala index 1e11941473..4cbbafb0d5 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/LotsOfDataBot.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2015 Typesafe Inc. */ -package akka.cluster.ddata.sample +package akka.cluster.ddata import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom @@ -10,12 +10,19 @@ import akka.actor.ActorLogging import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.Cluster -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.ORSet +import akka.cluster.ddata.Replicator.Changed +import akka.cluster.ddata.Replicator.GetKeyIds +import akka.cluster.ddata.Replicator.GetKeyIdsResult +import akka.cluster.ddata.Replicator.Subscribe +import akka.cluster.ddata.Replicator.Update +import akka.cluster.ddata.Replicator.UpdateResponse +import akka.cluster.ddata.Replicator.WriteLocal import com.typesafe.config.ConfigFactory -import akka.cluster.ddata.Replicator -import akka.cluster.ddata.ORSetKey +/** + * This "sample" simulates lots of data entries, and can be used for + * optimizing replication (e.g. catch-up when adding more nodes). + */ object LotsOfDataBot { def main(args: Array[String]): Unit = { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/DataBot.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/DataBot.scala deleted file mode 100644 index 22deb14cc9..0000000000 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/sample/DataBot.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.cluster.ddata.sample - -import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorSystem -import akka.actor.Props -import akka.cluster.Cluster -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.ORSet -import com.typesafe.config.ConfigFactory -import akka.cluster.ddata.Replicator -import akka.cluster.ddata.ORSetKey - -object DataBot { - - def main(args: Array[String]): Unit = { - if (args.isEmpty) - startup(Seq("2551", "2552", "0")) - else - startup(args) - } - - def startup(ports: Seq[String]): Unit = { - ports.foreach { port ⇒ - // Override the configuration of the port - val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port). - withFallback(ConfigFactory.load( - ConfigFactory.parseString(""" - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote { - netty.tcp { - hostname = "127.0.0.1" - port = 0 - } - } - - akka.cluster { - seed-nodes = [ - "akka.tcp://ClusterSystem@127.0.0.1:2551", - "akka.tcp://ClusterSystem@127.0.0.1:2552"] - - auto-down-unreachable-after = 10s - } - """))) - - // Create an Akka system - val system = ActorSystem("ClusterSystem", config) - // Create an actor that handles cluster domain events - system.actorOf(Props[DataBot], name = "dataBot") - } - } - - private case object Tick - -} - -class DataBot extends Actor with ActorLogging { - import DataBot._ - import Replicator._ - - val replicator = DistributedData(context.system).replicator - implicit val node = Cluster(context.system) - - import context.dispatcher - val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick) - - val DataKey = ORSetKey[String]("key") - - replicator ! Subscribe(DataKey, self) - - def receive = { - case Tick ⇒ - val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString - if (ThreadLocalRandom.current().nextBoolean()) { - // add - log.info("Adding: {}", s) - replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s) - } else { - // remove - log.info("Removing: {}", s) - replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s) - } - - case _: UpdateResponse[_] ⇒ // ignore - - case c @ Changed(DataKey) ⇒ - log.info("Current elements: {}", c.get(DataKey).elements) - } - - override def postStop(): Unit = tickTask.cancel() - -} - diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index 79a4982ad6..ad89eeeead 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -178,11 +178,11 @@ Here is an example of using ``writeMajority`` and ``readMajority``: **FIXME convert this example to Java** -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#read-write-majority +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#get-cart +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#get-cart -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#add-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#add-item In some rare cases, when performing an ``Update`` it is needed to first try to fetch latest data from other nodes. That can be done by first sending a ``Get`` with ``ReadMajority`` and then continue with @@ -196,7 +196,7 @@ The following example illustrates how to do that: **FIXME convert this example to Java** -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#remove-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#remove-item .. warning:: @@ -333,7 +333,7 @@ use a timestamp value based on something else, for example an increasing version from a database record that is used for optimistic concurrency control. When a data entry is changed the full state of that entry is replicated to other nodes, i.e. -when you update an map the whole map is replicated. Therefore, instead of using one ``ORMap`` +when you update a map the whole map is replicated. Therefore, instead of using one ``ORMap`` with 1000 elements it is more efficient to split that up in 10 top level ``ORMap`` entries with 100 elements each. Top level entries are replicated individually, which has the trade-off that different entries may not be replicated at the same time and you may see @@ -454,13 +454,16 @@ cluster. Data types that need pruning have to implement the ``RemovedNodePruning Samples ======= -**FIXME convert these sampes to Java and activator template** +**FIXME convert these sampes to Java** -* `Replicated Cache <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala>`_ -* `Replicated Metrics <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala>`_ -* `Replicated Service Registry <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedServiceRegistrySpec.scala>`_ -* `VotingService <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/VotingContestSpec.scala>`_ -* `ShoppingCart <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala>`_ +Several interesting samples are included and described in the `Typesafe Activator `_ +tutorial named `Akka Distributed Data Samples with Scala `_. + +* Low Latency Voting Service +* Highly Available Shopping Cart +* Distributed Service Registry +* Replicated Cache +* Replicated Metrics Limitations =========== diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 327efd9822..dfa102dde6 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -176,11 +176,11 @@ to 4 nodes and reads from 4 nodes. Here is an example of using ``WriteMajority`` and ``ReadMajority``: -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#read-write-majority +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#get-cart +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#get-cart -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#add-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#add-item In some rare cases, when performing an ``Update`` it is needed to first try to fetch latest data from other nodes. That can be done by first sending a ``Get`` with ``ReadMajority`` and then continue with @@ -192,7 +192,7 @@ performed (hence the name observed-removed set). The following example illustrates how to do that: -.. includecode:: ../../../akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala#remove-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#remove-item .. warning:: @@ -329,7 +329,7 @@ use a timestamp value based on something else, for example an increasing version from a database record that is used for optimistic concurrency control. When a data entry is changed the full state of that entry is replicated to other nodes, i.e. -when you update an map the whole map is replicated. Therefore, instead of using one ``ORMap`` +when you update a map the whole map is replicated. Therefore, instead of using one ``ORMap`` with 1000 elements it is more efficient to split that up in 10 top level ``ORMap`` entries with 100 elements each. Top level entries are replicated individually, which has the trade-off that different entries may not be replicated at the same time and you may see @@ -442,13 +442,14 @@ cluster. Data types that need pruning have to implement the ``RemovedNodePruning Samples ======= -**FIXME convert these samples to activator template** +Several interesting samples are included and described in the `Typesafe Activator `_ +tutorial named `Akka Distributed Data Samples with Scala `_. -* `Replicated Cache <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala>`_ -* `Replicated Metrics <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala>`_ -* `Replicated Service Registry <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedServiceRegistrySpec.scala>`_ -* `VotingService <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/VotingContestSpec.scala>`_ -* `ShoppingCart <@github@/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala>`_ +* Low Latency Voting Service +* Highly Available Shopping Cart +* Distributed Service Registry +* Replicated Cache +* Replicated Metrics Limitations =========== diff --git a/akka-samples/akka-sample-distributed-data-scala/.gitignore b/akka-samples/akka-sample-distributed-data-scala/.gitignore new file mode 100644 index 0000000000..660c959e44 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/.gitignore @@ -0,0 +1,17 @@ +*# +*.iml +*.ipr +*.iws +*.pyc +*.tm.epoch +*.vim +*-shim.sbt +.idea/ +/project/plugins/project +project/boot +target/ +/logs +.cache +.classpath +.project +.settings \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-scala/COPYING b/akka-samples/akka-sample-distributed-data-scala/COPYING new file mode 100644 index 0000000000..0e259d42c9 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/COPYING @@ -0,0 +1,121 @@ +Creative Commons Legal Code + +CC0 1.0 Universal + + CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE + LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN + ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS + INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES + REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS + PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM + THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED + HEREUNDER. + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator +and subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for +the purpose of contributing to a commons of creative, cultural and +scientific works ("Commons") that the public can reliably and without fear +of later claims of infringement build upon, modify, incorporate in other +works, reuse and redistribute as freely as possible in any form whatsoever +and for any purposes, including without limitation commercial purposes. +These owners may contribute to the Commons to promote the ideal of a free +culture and the further production of creative, cultural and scientific +works, or to gain reputation or greater distribution for their Work in +part through the use and efforts of others. + +For these and/or other purposes and motivations, and without any +expectation of additional consideration or compensation, the person +associating CC0 with a Work (the "Affirmer"), to the extent that he or she +is an owner of Copyright and Related Rights in the Work, voluntarily +elects to apply CC0 to the Work and publicly distribute the Work under its +terms, with knowledge of his or her Copyright and Related Rights in the +Work and the meaning and intended legal effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not +limited to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, + communicate, and translate a Work; + ii. moral rights retained by the original author(s) and/or performer(s); +iii. publicity and privacy rights pertaining to a person's image or + likeness depicted in a Work; + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + v. rights protecting the extraction, dissemination, use and reuse of data + in a Work; + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation + thereof, including any amended or successor version of such + directive); and +vii. other similar, equivalent or corresponding rights throughout the + world based on applicable law or treaty, and any national + implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention +of, applicable law, Affirmer hereby overtly, fully, permanently, +irrevocably and unconditionally waives, abandons, and surrenders all of +Affirmer's Copyright and Related Rights and associated claims and causes +of action, whether now known or unknown (including existing as well as +future claims and causes of action), in the Work (i) in all territories +worldwide, (ii) for the maximum duration provided by applicable law or +treaty (including future time extensions), (iii) in any current or future +medium and for any number of copies, and (iv) for any purpose whatsoever, +including without limitation commercial, advertising or promotional +purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each +member of the public at large and to the detriment of Affirmer's heirs and +successors, fully intending that such Waiver shall not be subject to +revocation, rescission, cancellation, termination, or any other legal or +equitable action to disrupt the quiet enjoyment of the Work by the public +as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason +be judged legally invalid or ineffective under applicable law, then the +Waiver shall be preserved to the maximum extent permitted taking into +account Affirmer's express Statement of Purpose. In addition, to the +extent the Waiver is so judged Affirmer hereby grants to each affected +person a royalty-free, non transferable, non sublicensable, non exclusive, +irrevocable and unconditional license to exercise Affirmer's Copyright and +Related Rights in the Work (i) in all territories worldwide, (ii) for the +maximum duration provided by applicable law or treaty (including future +time extensions), (iii) in any current or future medium and for any number +of copies, and (iv) for any purpose whatsoever, including without +limitation commercial, advertising or promotional purposes (the +"License"). The License shall be deemed effective as of the date CC0 was +applied by Affirmer to the Work. Should any part of the License for any +reason be judged legally invalid or ineffective under applicable law, such +partial invalidity or ineffectiveness shall not invalidate the remainder +of the License, and in such case Affirmer hereby affirms that he or she +will not (i) exercise any of his or her remaining Copyright and Related +Rights in the Work or (ii) assert any associated claims and causes of +action with respect to the Work, in either case contrary to Affirmer's +express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + b. Affirmer offers the Work as-is and makes no representations or + warranties of any kind concerning the Work, express, implied, + statutory or otherwise, including without limitation warranties of + title, merchantability, fitness for a particular purpose, non + infringement, or the absence of latent or other defects, accuracy, or + the present or absence of errors, whether or not discoverable, all to + the greatest extent permissible under applicable law. + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without + limitation any person's Copyright and Related Rights in the Work. + Further, Affirmer disclaims responsibility for obtaining any necessary + consents, permissions or other rights required for any use of the + Work. + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to + this CC0 or use of the Work. diff --git a/akka-samples/akka-sample-distributed-data-scala/LICENSE b/akka-samples/akka-sample-distributed-data-scala/LICENSE new file mode 100644 index 0000000000..287f8dd7fa --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/LICENSE @@ -0,0 +1,10 @@ +Activator Template by Typesafe + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Activator Tempate has waived all copyright and related or neighboring +rights to this Activator Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/akka-samples/akka-sample-distributed-data-scala/activator.properties b/akka-samples/akka-sample-distributed-data-scala/activator.properties new file mode 100644 index 0000000000..55ffb489d7 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/activator.properties @@ -0,0 +1,7 @@ +name=akka-sample-distributed-data-scala +title=Akka Distributed Data Samples with Scala +description=Akka Distributed Data Samples with Scala +tags=akka,cluster,scala,sample,distributed-data +authorName=Akka Team +authorLink=http://akka.io/ +sourceLink=https://github.com/akka/akka diff --git a/akka-samples/akka-sample-distributed-data-scala/build.sbt b/akka-samples/akka-sample-distributed-data-scala/build.sbt new file mode 100644 index 0000000000..e19c115141 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/build.sbt @@ -0,0 +1,46 @@ +import com.typesafe.sbt.SbtMultiJvm +import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm + +val akkaVersion = "2.4-SNAPSHOT" + +val project = Project( + id = "akka-sample-distributed-data-scala", + base = file("."), + settings = Project.defaultSettings ++ SbtMultiJvm.multiJvmSettings ++ Seq( + name := "akka-sample-distributed-data-scala", + version := "2.4-SNAPSHOT", + scalaVersion := "2.11.6", + scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"), + javacOptions in Compile ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked", "-Xlint:deprecation"), + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-remote" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion, + "org.scalatest" %% "scalatest" % "2.2.1" % "test"), + javaOptions in run ++= Seq( + "-Xms128m", "-Xmx1024m"), + Keys.fork in run := true, + // make sure that MultiJvm test are compiled by the default test compilation + compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test), + // disable parallel tests + parallelExecution in Test := false, + // make sure that MultiJvm tests are executed by the default test target, + // and combine the results from ordinary test and multi-jvm tests + executeTests in Test <<= (executeTests in Test, executeTests in MultiJvm) map { + case (testResults, multiNodeResults) => + val overall = + if (testResults.overall.id < multiNodeResults.overall.id) + multiNodeResults.overall + else + testResults.overall + Tests.Output(overall, + testResults.events ++ multiNodeResults.events, + testResults.summaries ++ multiNodeResults.summaries) + } + ) +) configs (MultiJvm) + + +fork in run := true \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-scala/project/build.properties b/akka-samples/akka-sample-distributed-data-scala/project/build.properties new file mode 100644 index 0000000000..748703f770 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.7 diff --git a/akka-samples/akka-sample-distributed-data-scala/project/plugins.sbt b/akka-samples/akka-sample-distributed-data-scala/project/plugins.sbt new file mode 100644 index 0000000000..c3e7d797de --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/project/plugins.sbt @@ -0,0 +1,4 @@ + +resolvers += Classpaths.typesafeResolver + +addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8") diff --git a/akka-samples/akka-sample-distributed-data-scala/project/sbt-ui.sbt b/akka-samples/akka-sample-distributed-data-scala/project/sbt-ui.sbt new file mode 100644 index 0000000000..7c28b97b34 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/project/sbt-ui.sbt @@ -0,0 +1,3 @@ +// This plugin represents functionality that is to be added to sbt in the future + +addSbtPlugin("org.scala-sbt" % "sbt-core-next" % "0.1.1") \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/resources/application.conf b/akka-samples/akka-sample-distributed-data-scala/src/main/resources/application.conf new file mode 100644 index 0000000000..5bdd1d8522 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/resources/application.conf @@ -0,0 +1,21 @@ +akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + + cluster { + seed-nodes = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] + + auto-down-unreachable-after = 10s + } +} + diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala new file mode 100644 index 0000000000..3187449bfa --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala @@ -0,0 +1,53 @@ +package sample.distributeddata + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.LWWMap +import akka.cluster.ddata.LWWMapKey + +object ReplicatedCache { + import akka.cluster.ddata.Replicator._ + + def props: Props = Props[ReplicatedCache] + + private final case class Request(key: String, replyTo: ActorRef) + + final case class PutInCache(key: String, value: Any) + final case class GetFromCache(key: String) + final case class Cached(key: String, value: Option[Any]) + final case class Evict(key: String) +} + +class ReplicatedCache extends Actor { + import akka.cluster.ddata.Replicator._ + import ReplicatedCache._ + + val replicator = DistributedData(context.system).replicator + implicit val cluster = Cluster(context.system) + + def dataKey(entryKey: String): LWWMapKey[Any] = + LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100) + + def receive = { + case PutInCache(key, value) ⇒ + replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ + (key -> value)) + case Evict(key) ⇒ + replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ - key) + case GetFromCache(key) ⇒ + replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender()))) + case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) ⇒ + g.dataValue match { + case data: LWWMap[_] ⇒ data.get(key) match { + case Some(value) ⇒ replyTo ! Cached(key, Some(value)) + case None ⇒ replyTo ! Cached(key, None) + } + } + case NotFound(_, Some(Request(key, replyTo))) ⇒ + replyTo ! Cached(key, None) + case _: UpdateResponse[_] ⇒ // ok + } + +} diff --git a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala similarity index 55% rename from akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala rename to akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala index 2fddea2f7e..f4c0b140d9 100644 --- a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala @@ -1,42 +1,21 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package sample.datareplication +package sample.distributeddata import java.lang.management.ManagementFactory import java.lang.management.MemoryMXBean import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.Address import akka.actor.Props import akka.cluster.Cluster -import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberUp, MemberRemoved } +import akka.cluster.ClusterEvent.InitialStateAsEvents +import akka.cluster.ClusterEvent.MemberRemoved +import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ddata.DistributedData import akka.cluster.ddata.LWWMap -import akka.cluster.ddata.Replicator.GetReplicaCount -import akka.cluster.ddata.Replicator.ReplicaCount -import akka.cluster.ddata.STMultiNodeSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory import akka.cluster.ddata.LWWMapKey -object ReplicatedMetricsSpec extends MultiNodeConfig { - val node1 = role("node-1") - val node2 = role("node-2") - val node3 = role("node-3") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.log-dead-letters-during-shutdown = off - """)) - -} - object ReplicatedMetrics { import akka.cluster.ddata.Replicator._ @@ -132,69 +111,3 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite } } - -class ReplicatedMetricsSpecMultiJvmNode1 extends ReplicatedMetricsSpec -class ReplicatedMetricsSpecMultiJvmNode2 extends ReplicatedMetricsSpec -class ReplicatedMetricsSpecMultiJvmNode3 extends ReplicatedMetricsSpec - -class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with STMultiNodeSpec with ImplicitSender { - import ReplicatedMetricsSpec._ - import ReplicatedMetrics._ - - override def initialParticipants = roles.size - - val cluster = Cluster(system) - val replicatedMetrics = system.actorOf(ReplicatedMetrics.props(1.second, 3.seconds)) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster join node(to).address - } - enterBarrier(from.name + "-joined") - } - - "Demo of a replicated metrics" must { - "join cluster" in within(10.seconds) { - join(node1, node1) - join(node2, node1) - join(node3, node1) - - awaitAssert { - DistributedData(system).replicator ! GetReplicaCount - expectMsg(ReplicaCount(roles.size)) - } - enterBarrier("after-1") - } - - "replicate metrics" in within(10.seconds) { - val probe = TestProbe() - system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) - awaitAssert { - probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) - } - probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) - probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) - enterBarrier("after-2") - } - - "cleanup removed node" in within(15.seconds) { - val node3Address = node(node3).address - runOn(node1) { - cluster.leave(node3Address) - } - runOn(node1, node2) { - val probe = TestProbe() - system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) - awaitAssert { - probe.expectMsgType[UsedHeap].percentPerNode.size should be(2) - } - probe.expectMsgType[UsedHeap].percentPerNode should not contain ( - nodeKey(node3Address)) - } - enterBarrier("after-3") - } - - } - -} - diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala new file mode 100644 index 0000000000..dfe17559e5 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala @@ -0,0 +1,127 @@ +package sample.distributeddata + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent +import akka.cluster.ClusterEvent.LeaderChanged +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.GSet +import akka.cluster.ddata.GSetKey +import akka.cluster.ddata.Key +import akka.cluster.ddata.ORSet + +object ServiceRegistry { + import akka.cluster.ddata.Replicator._ + + val props: Props = Props[ServiceRegistry] + + /** + * Register a `service` with a `name`. Several services + * can be registered with the same `name`. + * It will be removed when it is terminated. + */ + final case class Register(name: String, service: ActorRef) + /** + * Lookup services registered for a `name`. [[Bindings]] will + * be sent to `sender()`. + */ + final case class Lookup(name: String) + /** + * Reply for [[Lookup]] + */ + final case class Bindings(name: String, services: Set[ActorRef]) + /** + * Published to `System.eventStream` when services are changed. + */ + final case class BindingChanged(name: String, services: Set[ActorRef]) + + final case class ServiceKey(serviceName: String) extends Key[ORSet[ActorRef]](serviceName) + + private val AllServicesKey = GSetKey[ServiceKey]("service-keys") + +} + +class ServiceRegistry extends Actor with ActorLogging { + import akka.cluster.ddata.Replicator._ + import ServiceRegistry._ + + val replicator = DistributedData(context.system).replicator + implicit val cluster = Cluster(context.system) + + var keys = Set.empty[ServiceKey] + var services = Map.empty[String, Set[ActorRef]] + var leader = false + + def serviceKey(serviceName: String): ServiceKey = + ServiceKey("service:" + serviceName) + + override def preStart(): Unit = { + replicator ! Subscribe(AllServicesKey, self) + cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.LeaderChanged]) + } + + override def postStop(): Unit = { + cluster.unsubscribe(self) + } + + def receive = { + case Register(name, service) ⇒ + val dKey = serviceKey(name) + // store the service names in a separate GSet to be able to + // get notifications of new names + if (!keys(dKey)) + replicator ! Update(AllServicesKey, GSet(), WriteLocal)(_ + dKey) + // add the service + replicator ! Update(dKey, ORSet(), WriteLocal)(_ + service) + + case Lookup(key) ⇒ + sender() ! Bindings(key, services.getOrElse(key, Set.empty)) + + case c @ Changed(AllServicesKey) ⇒ + val newKeys = c.get(AllServicesKey).elements + log.debug("Services changed, added: {}, all: {}", (newKeys -- keys), newKeys) + (newKeys -- keys).foreach { dKey ⇒ + // subscribe to get notifications of when services with this name are added or removed + replicator ! Subscribe(dKey, self) + } + keys = newKeys + + case c @ Changed(ServiceKey(serviceName)) ⇒ + val name = serviceName.split(":").tail.mkString + val newServices = c.get(serviceKey(name)).elements + log.debug("Services changed for name [{}]: {}", name, newServices) + services = services.updated(name, newServices) + context.system.eventStream.publish(BindingChanged(name, newServices)) + if (leader) + newServices.foreach(context.watch) // watch is idempotent + + case LeaderChanged(node) ⇒ + // Let one node (the leader) be responsible for removal of terminated services + // to avoid redundant work and too many death watch notifications. + // It is not critical to only do it from one node. + val wasLeader = leader + leader = node.exists(_ == cluster.selfAddress) + // when used with many (> 500) services you must increase the system message buffer + // `akka.remote.system-message-buffer-size` + if (!wasLeader && leader) + for (refs ← services.valuesIterator; ref ← refs) + context.watch(ref) + else if (wasLeader && !leader) + for (refs ← services.valuesIterator; ref ← refs) + context.unwatch(ref) + + case Terminated(ref) ⇒ + val names = services.collect { case (name, refs) if refs.contains(ref) ⇒ name } + names.foreach { name ⇒ + log.debug("Service with name [{}] terminated: {}", name, ref) + replicator ! Update(serviceKey(name), ORSet(), WriteLocal)(_ - ref) + } + + case _: UpdateResponse[_] ⇒ // ok + } + +} diff --git a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala similarity index 54% rename from akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala rename to akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala index 250e02ae49..97cd57163c 100644 --- a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedShoppingCartSpec.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala @@ -1,7 +1,4 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package sample.datareplication +package sample.distributeddata import scala.concurrent.duration._ import akka.actor.Actor @@ -10,29 +7,8 @@ import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ddata.DistributedData import akka.cluster.ddata.LWWMap -import akka.cluster.ddata.Replicator.GetReplicaCount -import akka.cluster.ddata.Replicator.ReplicaCount -import akka.cluster.ddata.STMultiNodeSpec -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory import akka.cluster.ddata.LWWMapKey -object ReplicatedShoppingCartSpec extends MultiNodeConfig { - val node1 = role("node-1") - val node2 = role("node-2") - val node3 = role("node-3") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.log-dead-letters-during-shutdown = off - """)) - -} - object ShoppingCart { import akka.cluster.ddata.Replicator._ @@ -139,76 +115,3 @@ class ShoppingCart(userId: String) extends Actor { } } - -class ReplicatedShoppingCartSpecMultiJvmNode1 extends ReplicatedShoppingCartSpec -class ReplicatedShoppingCartSpecMultiJvmNode2 extends ReplicatedShoppingCartSpec -class ReplicatedShoppingCartSpecMultiJvmNode3 extends ReplicatedShoppingCartSpec - -class ReplicatedShoppingCartSpec extends MultiNodeSpec(ReplicatedShoppingCartSpec) with STMultiNodeSpec with ImplicitSender { - import ReplicatedShoppingCartSpec._ - import ShoppingCart._ - - override def initialParticipants = roles.size - - val cluster = Cluster(system) - val shoppingCart = system.actorOf(ShoppingCart.props("user-1")) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster join node(to).address - } - enterBarrier(from.name + "-joined") - } - - "Demo of a replicated shopping cart" must { - "join cluster" in within(10.seconds) { - join(node1, node1) - join(node2, node1) - join(node3, node1) - - awaitAssert { - DistributedData(system).replicator ! GetReplicaCount - expectMsg(ReplicaCount(roles.size)) - } - enterBarrier("after-1") - } - - "handle updates directly after start" in within(15.seconds) { - runOn(node2) { - shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 2)) - shoppingCart ! ShoppingCart.AddItem(LineItem("2", "Oranges", quantity = 3)) - } - enterBarrier("updates-done") - - awaitAssert { - shoppingCart ! ShoppingCart.GetCart - val cart = expectMsgType[Cart] - cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3))) - } - - enterBarrier("after-2") - } - - "handle updates from different nodes" in within(5.seconds) { - runOn(node2) { - shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 5)) - shoppingCart ! ShoppingCart.RemoveItem("2") - } - runOn(node3) { - shoppingCart ! ShoppingCart.AddItem(LineItem("3", "Bananas", quantity = 4)) - } - enterBarrier("updates-done") - - awaitAssert { - shoppingCart ! ShoppingCart.GetCart - val cart = expectMsgType[Cart] - cart.items should be(Set(LineItem("1", "Apples", quantity = 7), LineItem("3", "Bananas", quantity = 4))) - } - - enterBarrier("after-3") - } - - } - -} - diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala new file mode 100644 index 0000000000..22e9c8bee4 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala @@ -0,0 +1,88 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.FlagKey +import akka.actor.Actor +import akka.cluster.ddata.PNCounterMapKey +import akka.actor.ActorRef +import akka.cluster.Cluster +import akka.cluster.ddata.PNCounterMap +import akka.cluster.ddata.Flag + +object VotingService { + case object Open + case object OpenAck + case object Close + case object CloseAck + final case class Vote(participant: String) + case object GetVotes + final case class Votes(result: Map[String, BigInt], open: Boolean) + + private final case class GetVotesReq(replyTo: ActorRef) +} + +class VotingService extends Actor { + import akka.cluster.ddata.Replicator._ + import VotingService._ + + val replicator = DistributedData(context.system).replicator + implicit val cluster = Cluster(context.system) + val OpenedKey = FlagKey("contestOpened") + val ClosedKey = FlagKey("contestClosed") + val CountersKey = PNCounterMapKey("contestCounters") + + replicator ! Subscribe(OpenedKey, self) + + def receive = { + case Open ⇒ + replicator ! Update(OpenedKey, Flag(), WriteAll(5.seconds))(_.switchOn) + becomeOpen() + + case c @ Changed(OpenedKey) if c.get(OpenedKey).enabled ⇒ + becomeOpen() + + case GetVotes ⇒ + sender() ! Votes(Map.empty, open = false) + } + + def becomeOpen(): Unit = { + replicator ! Unsubscribe(OpenedKey, self) + replicator ! Subscribe(ClosedKey, self) + context.become(open orElse getVotes(open = true)) + } + + def open: Receive = { + case v @ Vote(participant) ⇒ + val update = Update(CountersKey, PNCounterMap(), WriteLocal, request = Some(v)) { + _.increment(participant, 1) + } + replicator ! update + + case _: UpdateSuccess[_] ⇒ + + case Close ⇒ + replicator ! Update(ClosedKey, Flag(), WriteAll(5.seconds))(_.switchOn) + context.become(getVotes(open = false)) + + case c @ Changed(ClosedKey) if c.get(ClosedKey).enabled ⇒ + context.become(getVotes(open = false)) + } + + def getVotes(open: Boolean): Receive = { + case GetVotes ⇒ + replicator ! Get(CountersKey, ReadAll(3.seconds), Some(GetVotesReq(sender()))) + + case g @ GetSuccess(CountersKey, Some(GetVotesReq(replyTo))) ⇒ + val data = g.get(CountersKey) + replyTo ! Votes(data.entries, open) + + case NotFound(CountersKey, Some(GetVotesReq(replyTo))) ⇒ + replyTo ! Votes(Map.empty, open) + + case _: GetFailure[_] ⇒ + + case _: UpdateSuccess[_] ⇒ + } + +} diff --git a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala similarity index 66% rename from akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala rename to akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala index e594b48958..5be8ab0c00 100644 --- a/akka-distributed-data/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala @@ -1,24 +1,15 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package sample.datareplication +package sample.distributeddata import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.LWWMap import akka.cluster.ddata.Replicator.GetReplicaCount import akka.cluster.ddata.Replicator.ReplicaCount -import akka.cluster.ddata.STMultiNodeSpec import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import com.typesafe.config.ConfigFactory -import akka.cluster.ddata.LWWMapKey object ReplicatedCacheSpec extends MultiNodeConfig { val node1 = role("node-1") @@ -33,50 +24,6 @@ object ReplicatedCacheSpec extends MultiNodeConfig { } -object ReplicatedCache { - import akka.cluster.ddata.Replicator._ - - def props: Props = Props[ReplicatedCache] - - private final case class Request(key: String, replyTo: ActorRef) - - final case class PutInCache(key: String, value: Any) - final case class GetFromCache(key: String) - final case class Cached(key: String, value: Option[Any]) - final case class Evict(key: String) -} - -class ReplicatedCache() extends Actor { - import akka.cluster.ddata.Replicator._ - import ReplicatedCache._ - - val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) - - def dataKey(entryKey: String): LWWMapKey[Any] = - LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100) - - def receive = { - case PutInCache(key, value) ⇒ - replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ + (key -> value)) - case Evict(key) ⇒ - replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ - key) - case GetFromCache(key) ⇒ - replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender()))) - case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) ⇒ - g.dataValue match { - case data: LWWMap[_] ⇒ data.get(key) match { - case Some(value) ⇒ replyTo ! Cached(key, Some(value)) - case None ⇒ replyTo ! Cached(key, None) - } - } - case NotFound(_, Some(Request(key, replyTo))) ⇒ - replyTo ! Cached(key, None) - case _: UpdateResponse[_] ⇒ // ok - } - -} - class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala new file mode 100644 index 0000000000..713c7ef63f --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala @@ -0,0 +1,91 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ReplicatedMetricsSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ReplicatedMetricsSpecMultiJvmNode1 extends ReplicatedMetricsSpec +class ReplicatedMetricsSpecMultiJvmNode2 extends ReplicatedMetricsSpec +class ReplicatedMetricsSpecMultiJvmNode3 extends ReplicatedMetricsSpec + +class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with STMultiNodeSpec with ImplicitSender { + import ReplicatedMetricsSpec._ + import ReplicatedMetrics._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val replicatedMetrics = system.actorOf(ReplicatedMetrics.props(1.second, 3.seconds)) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated metrics" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "replicate metrics" in within(10.seconds) { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) + awaitAssert { + probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + } + probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + enterBarrier("after-2") + } + + "cleanup removed node" in within(15.seconds) { + val node3Address = node(node3).address + runOn(node1) { + cluster.leave(node3Address) + } + runOn(node1, node2) { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) + awaitAssert { + probe.expectMsgType[UsedHeap].percentPerNode.size should be(2) + } + probe.expectMsgType[UsedHeap].percentPerNode should not contain ( + nodeKey(node3Address)) + } + enterBarrier("after-3") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala new file mode 100644 index 0000000000..0daad1df58 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala @@ -0,0 +1,17 @@ +package sample.distributeddata + +import akka.remote.testkit.MultiNodeSpecCallbacks + +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } +import org.scalatest.Matchers + +/** + * Hooks up MultiNodeSpec with ScalaTest + */ +trait STMultiNodeSpec extends MultiNodeSpecCallbacks + with WordSpecLike with Matchers with BeforeAndAfterAll { + + override def beforeAll() = multiNodeSpecBeforeAll() + + override def afterAll() = multiNodeSpecAfterAll() +} diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala new file mode 100644 index 0000000000..5cdddfa689 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala @@ -0,0 +1,141 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ServiceRegistrySpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + + class Service extends Actor { + def receive = { + case s: String ⇒ sender() ! self.path.name + ": " + s + } + } + +} + +class ServiceRegistrySpecMultiJvmNode1 extends ServiceRegistrySpec +class ServiceRegistrySpecMultiJvmNode2 extends ServiceRegistrySpec +class ServiceRegistrySpecMultiJvmNode3 extends ServiceRegistrySpec + +class ServiceRegistrySpec extends MultiNodeSpec(ServiceRegistrySpec) with STMultiNodeSpec with ImplicitSender { + import ServiceRegistrySpec._ + import ServiceRegistry._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val registry = system.actorOf(ServiceRegistry.props) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated service registry" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "replicate service entry" in within(10.seconds) { + runOn(node1) { + val a1 = system.actorOf(Props[Service], name = "a1") + registry ! Register("a", a1) + } + + awaitAssert { + val probe = TestProbe() + registry.tell(Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1")) + } + + enterBarrier("after-2") + } + + "replicate updated service entry, and publish to even bus" in { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) + + runOn(node2) { + val a2 = system.actorOf(Props[Service], name = "a2") + registry ! Register("a", a2) + } + + probe.within(10.seconds) { + probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1", "a2")) + registry.tell(Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1", "a2")) + } + + enterBarrier("after-4") + } + + "remove terminated service" in { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) + + runOn(node2) { + registry.tell(Lookup("a"), probe.ref) + val a2 = probe.expectMsgType[Bindings].services.find(_.path.name == "a2").get + a2 ! PoisonPill + } + + probe.within(10.seconds) { + probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1")) + registry.tell(Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1")) + } + + enterBarrier("after-5") + } + + "replicate many service entries" in within(10.seconds) { + for (i ← 100 until 200) { + val service = system.actorOf(Props[Service], name = myself.name + "_" + i) + registry ! Register("a" + i, service) + } + + awaitAssert { + val probe = TestProbe() + for (i ← 100 until 200) { + registry.tell(Lookup("a" + i), probe.ref) + probe.expectMsgType[Bindings].services.map(_.path.name) should be(roles.map(_.name + "_" + i).toSet) + } + } + + enterBarrier("after-6") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala new file mode 100644 index 0000000000..38219ecacd --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala @@ -0,0 +1,98 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ShoppingCartSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ShoppingCartSpecMultiJvmNode1 extends ShoppingCartSpec +class ShoppingCartSpecMultiJvmNode2 extends ShoppingCartSpec +class ShoppingCartSpecMultiJvmNode3 extends ShoppingCartSpec + +class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeSpec with ImplicitSender { + import ShoppingCartSpec._ + import ShoppingCart._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val shoppingCart = system.actorOf(ShoppingCart.props("user-1")) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated shopping cart" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "handle updates directly after start" in within(15.seconds) { + runOn(node2) { + shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 2)) + shoppingCart ! ShoppingCart.AddItem(LineItem("2", "Oranges", quantity = 3)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingCart ! ShoppingCart.GetCart + val cart = expectMsgType[Cart] + cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3))) + } + + enterBarrier("after-2") + } + + "handle updates from different nodes" in within(5.seconds) { + runOn(node2) { + shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 5)) + shoppingCart ! ShoppingCart.RemoveItem("2") + } + runOn(node3) { + shoppingCart ! ShoppingCart.AddItem(LineItem("3", "Bananas", quantity = 4)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingCart ! ShoppingCart.GetCart + val cart = expectMsgType[Cart] + cart.items should be(Set(LineItem("1", "Apples", quantity = 7), LineItem("3", "Bananas", quantity = 4))) + } + + enterBarrier("after-3") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala new file mode 100644 index 0000000000..52ed731a4e --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala @@ -0,0 +1,97 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object VotingServiceSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class VotingServiceSpecMultiJvmNode1 extends VotingServiceSpec +class VotingServiceSpecMultiJvmNode2 extends VotingServiceSpec +class VotingServiceSpecMultiJvmNode3 extends VotingServiceSpec + +class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNodeSpec with ImplicitSender { + import VotingServiceSpec._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated voting" must { + + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "count votes correctly" in within(15.seconds) { + import VotingService._ + val votingService = system.actorOf(Props[VotingService], "votingService") + val N = 1000 + runOn(node1) { + votingService ! Open + for (n ← 1 to N) { + votingService ! Vote("#" + ((n % 20) + 1)) + } + } + runOn(node2, node3) { + // wait for it to open + val p = TestProbe() + awaitAssert { + votingService.tell(GetVotes, p.ref) + p.expectMsgPF(3.seconds) { case Votes(_, true) ⇒ true } + } + for (n ← 1 to N) { + votingService ! Vote("#" + ((n % 20) + 1)) + } + } + enterBarrier("voting-done") + runOn(node3) { + votingService ! Close + } + + val expected = (1 to 20).map(n ⇒ "#" + n -> BigInt(3L * N / 20)).toMap + awaitAssert { + votingService ! GetVotes + expectMsg(3.seconds, Votes(expected, false)) + } + + enterBarrier("after-2") + } + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html b/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html new file mode 100644 index 0000000000..71d85524b9 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html @@ -0,0 +1,285 @@ + + +Akka Distributed Data Samples with Scala + + + + +
+

+This tutorial contains 5 samples illustrating how to use +Akka Distributed Data. +

+
    +
  • Low Latency Voting Service
  • +
  • Highly Available Shopping Cart
  • +
  • Distributed Service Registry
  • +
  • Replicated Cache
  • +
  • Replicated Metrics
  • +
+ +

+Akka Distributed Data is useful when you need to share data between nodes in an +Akka Cluster. The data is accessed with an actor providing a key-value store like API. +The keys are unique identifiers with type information of the data values. The values +are Conflict Free Replicated Data Types (CRDTs). +

+ +

+All data entries are spread to all nodes, or nodes with a certain role, in the cluster +via direct replication and gossip based dissemination. You have fine grained control +of the consistency level for reads and writes. +

+ +

+The nature CRDTs makes it possible to perform updates from any node without coordination. +Concurrent updates from different nodes will automatically be resolved by the monotonic +merge function, which all data types must provide. The state changes always converge. +Several useful data types for counters, sets, maps and registers are provided and +you can also implement your own custom data types. +

+ +

+It is eventually consistent and geared toward providing high read and write availability +(partition tolerance), with low latency. Note that in an eventually consistent system a read may return an +out-of-date value. +

+ +
+ +
+ +

Low Latency Voting Service

+ +

+Distributed Data is great for low latency services, since you can update or get data from the local replica +without immediate communication with other nodes. +

+ +

+Open VotingService.scala. +

+ +

+VotingService is an actor for low latency counting of votes on several cluster nodes and aggregation +of the grand total number of votes. The actor is started on each cluster node. First it expects an +Open message on one or several nodes. After that the counting can begin. The open +signal is immediately replicated to all nodes with a boolean +Flag. +Note WriteAll. +

+ +

+replicator ! Update(OpenedKey, Flag(), WriteAll(5.seconds))(_.switchOn)
+
+ +

+The actor is subscribing to changes of the OpenedKey and other instances of this actor, +also on other nodes, will be notified when the flag is changed. +

+ +

+replicator ! Subscribe(OpenedKey, self)
+
+ +

+case c @ Changed(OpenedKey) if c.get(OpenedKey).enabled
+
+ +

+The counters are kept in a +PNCounterMap +and updated with: +

+ +

+val update = Update(CountersKey, PNCounterMap(), WriteLocal, request = Some(v)) {
+  _.increment(participant, 1)
+}
+replicator ! update
+
+ +

+Incrementing the counter is very fast, since it only involves communication with the local +Replicator actor. Note WriteLocal. Those updates are also spread +to other nodes, but that is performed in the background. +

+ +

+The total number of votes is retrieved with: +

+ +

+case GetVotes ⇒
+  replicator ! Get(CountersKey, ReadAll(3.seconds), Some(GetVotesReq(sender())))
+
+case g @ GetSuccess(CountersKey, Some(GetVotesReq(replyTo))) ⇒
+  val data = g.get(CountersKey)
+  replyTo ! Votes(data.entries, open)
+
+ +

+The multi-node test for the VotingService can be found in +VotingServiceSpec.scala. +

+ +

+Read the +Using the Replicator +documentation for more details of how to use Get, Update, and Subscribe. + +

+ +
+

Highly Available Shopping Cart

+ +

+Distributed Data is great for highly available services, since it is possible to perform +updates to the local node (or currently available nodes) during a network partition. +

+ +

+Open ShoppingCart.scala. +

+ +

+ShoppingCart is an actor that holds the selected items to buy for a user. +The actor instance for a specific user may be started where ever needed in the cluster, i.e. several +instances may be started on different nodes and used at the same time. +

+ +

+Each product in the cart is represented by a LineItem and all items in the cart +is collected in a LWWMap. +

+ +

+The actor handles the commands GetCart, AddItem and RemoveItem. +To get the latest updates in case the same shopping cart is used from several nodes it is using +consistency level of ReadMajority and WriteMajority, but that is only +done to reduce the risk of seeing old data. If such reads and writes cannot be completed due to a +network partition it falls back to reading/writing from the local replica (see GetFailure). +Local reads and writes will always be successful and when the network partition heals the updated +shopping carts will be be disseminated by the +gossip protocol +and the LWWMap CRDTs are merged, i.e. it is a highly available shopping cart. +

+ +

+The multi-node test for the ShoppingCart can be found in +ShoppingCartSpec.scala. +

+ +

+Read the +Consistency +section in the documentation to understand the consistency considerations. +

+ +
+ +
+

Distributed Service Registry

+ +

+Have you ever had the need to lookup actors by name in an Akka Cluster? +This example illustrates how you could implement such a registry. It is probably not +feature complete, but should be a good starting point. +

+ +

+Open ServiceRegistry.scala. +

+ +

+ServiceRegistry is an actor that is started on each node in the cluster. +It supports two basic commands: +

+
    +
  • Register to bind an ActorRef to a name, + several actors can be bound to the same name
  • +
  • Lookup get currently bound services of a given name
  • +
+ +

+For each named service it is using an +ORSet. +Here we are using top level ORSet entries. An alternative would have been to use a +ORMultiMap holding all services. That would have a disadvantage if we have many services. +When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you +update a map the whole map is replicated. +

+ +

+The ServiceRegistry is subscribing to changes of a GSet where we add +the names of all services. It is also subscribing to all such service keys to get notifications when +actors are added or removed to a named service. +

+ +

+The multi-node test for the ServiceRegistry can be found in +ServiceRegistrySpec.scala. +

+ +
+ +
+

Replicated Cache

+ +

+This example illustrates a simple key-value cache. +

+ +

+Open ReplicatedCache.scala. +

+ +

+ReplicatedCache is an actor that is started on each node in the cluster. +It supports three commands: PutInCache, GetFromCache and Evict. +

+ +

+It is splitting up the key space in 100 top level keys, each with a LWWMap. +When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you +update a map the whole map is replicated. Therefore, instead of using one ORMap with 1000 elements it +is more efficient to split that up in 100 top level ORMap entries with 10 elements each. Top level +entries are replicated individually, which has the trade-off that different entries may not be +replicated at the same time and you may see inconsistencies between related entries. +Separate top level entries cannot be updated atomically together. +

+ +

+The multi-node test for the ReplicatedCache can be found in +ReplicatedCacheSpec.scala. +

+ +
+ +
+

Replicated Metrics

+ +

+This example illustrates to spread metrics data to all nodes in an Akka cluster. +

+ +

+Open ReplicatedMetrics.scala. +

+ +

+ReplicatedMetrics is an actor that is started on each node in the cluster. +Periodically it collects some metrics, in this case used and max heap size. +Each metrics type is stored in a LWWMap where the key in the map is the address of +the node. The values are disseminated to other nodes with the gossip protocol. +

+ +

+The multi-node test for the ReplicatedCache can be found in +ReplicatedMetricsSpec.scala. +

+ +
+ + + diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a4b4ec4b92..4e22a54412 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -210,7 +210,8 @@ object AkkaBuild extends Build { Seq(sampleCamelJava, sampleCamelScala, sampleClusterJava, sampleClusterScala, sampleFsmScala, sampleFsmJavaLambda, sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala, samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda, - sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda) + sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda, + sampleDistributedDataScala) ) lazy val sampleCamelJava = Sample.project("akka-sample-camel-java") @@ -236,6 +237,8 @@ object AkkaBuild extends Build { lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala") lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda") + + lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala") lazy val osgiDiningHakkersSampleMavenTest = Project(id = "akka-sample-osgi-dining-hakkers-maven-test", base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test"),