=cdd #17779 Create activator template for Distributed Data

* includes the samples, with tutorial text description
This commit is contained in:
Patrik Nordwall 2015-06-29 21:18:39 +02:00
parent 33bc502c76
commit 252e88c082
29 changed files with 1278 additions and 821 deletions

View file

@ -1,187 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.datareplication
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.cluster.ddata.STMultiNodeSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.LWWMapKey
object ReplicatedCacheSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
object ReplicatedCache {
import akka.cluster.ddata.Replicator._
def props: Props = Props[ReplicatedCache]
private final case class Request(key: String, replyTo: ActorRef)
final case class PutInCache(key: String, value: Any)
final case class GetFromCache(key: String)
final case class Cached(key: String, value: Option[Any])
final case class Evict(key: String)
}
class ReplicatedCache() extends Actor {
import akka.cluster.ddata.Replicator._
import ReplicatedCache._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
def dataKey(entryKey: String): LWWMapKey[Any] =
LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100)
def receive = {
case PutInCache(key, value)
replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ + (key -> value))
case Evict(key)
replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ - key)
case GetFromCache(key)
replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender())))
case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo)))
g.dataValue match {
case data: LWWMap[_] data.get(key) match {
case Some(value) replyTo ! Cached(key, Some(value))
case None replyTo ! Cached(key, None)
}
}
case NotFound(_, Some(Request(key, replyTo)))
replyTo ! Cached(key, None)
case _: UpdateResponse[_] // ok
}
}
class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec
class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedCacheSpec._
import ReplicatedCache._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val replicatedCache = system.actorOf(ReplicatedCache.props)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated cache" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate cached entry" in within(10.seconds) {
runOn(node1) {
replicatedCache ! PutInCache("key1", "A")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key1"), probe.ref)
probe.expectMsg(Cached("key1", Some("A")))
}
enterBarrier("after-2")
}
"replicate many cached entries" in within(10.seconds) {
runOn(node1) {
for (i 100 to 200)
replicatedCache ! PutInCache("key" + i, i)
}
awaitAssert {
val probe = TestProbe()
for (i 100 to 200) {
replicatedCache.tell(GetFromCache("key" + i), probe.ref)
probe.expectMsg(Cached("key" + i, Some(i)))
}
}
enterBarrier("after-3")
}
"replicate evicted entry" in within(15.seconds) {
runOn(node1) {
replicatedCache ! PutInCache("key2", "B")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key2"), probe.ref)
probe.expectMsg(Cached("key2", Some("B")))
}
enterBarrier("key2-replicated")
runOn(node3) {
replicatedCache ! Evict("key2")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key2"), probe.ref)
probe.expectMsg(Cached("key2", None))
}
enterBarrier("after-4")
}
"replicate updated cached entry" in within(10.seconds) {
runOn(node2) {
replicatedCache ! PutInCache("key1", "A2")
replicatedCache ! PutInCache("key1", "A3")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key1"), probe.ref)
probe.expectMsg(Cached("key1", Some("A3")))
}
enterBarrier("after-5")
}
}
}

View file

@ -1,200 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.datareplication
import java.lang.management.ManagementFactory
import java.lang.management.MemoryMXBean
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Address
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberUp, MemberRemoved }
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.cluster.ddata.STMultiNodeSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.LWWMapKey
object ReplicatedMetricsSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
object ReplicatedMetrics {
import akka.cluster.ddata.Replicator._
def props(measureInterval: FiniteDuration, cleanupInterval: FiniteDuration): Props =
Props(new ReplicatedMetrics(measureInterval, cleanupInterval))
def props: Props = props(1.second, 1.minute)
private case object Tick
private case object Cleanup
case class UsedHeap(percentPerNode: Map[String, Double]) {
override def toString =
percentPerNode.toSeq.sortBy(_._1).map {
case (key, value) key + " --> " + value + " %"
}.mkString("\n")
}
def nodeKey(address: Address): String = address.host.get + ":" + address.port.get
}
class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: FiniteDuration)
extends Actor with ActorLogging {
import akka.cluster.ddata.Replicator._
import ReplicatedMetrics._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
val node = nodeKey(cluster.selfAddress)
val tickTask = context.system.scheduler.schedule(measureInterval, measureInterval,
self, Tick)(context.dispatcher)
val cleanupTask = context.system.scheduler.schedule(cleanupInterval, cleanupInterval,
self, Cleanup)(context.dispatcher)
val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
val UsedHeapKey = LWWMapKey[Long]("usedHeap")
val MaxHeapKey = LWWMapKey[Long]("maxHeap")
replicator ! Subscribe(UsedHeapKey, self)
replicator ! Subscribe(MaxHeapKey, self)
cluster.subscribe(self, InitialStateAsEvents, classOf[MemberUp], classOf[MemberRemoved])
override def postStop(): Unit = {
tickTask.cancel()
cluster.unsubscribe(self)
super.postStop()
}
var maxHeap = Map.empty[String, Long]
var nodesInCluster = Set.empty[String]
def receive = {
case Tick
val heap = memoryMBean.getHeapMemoryUsage
val used = heap.getUsed
val max = heap.getMax
replicator ! Update(UsedHeapKey, LWWMap.empty[Long], WriteLocal)(_ + (node -> used))
replicator ! Update(MaxHeapKey, LWWMap.empty[Long], WriteLocal) { data
data.get(node) match {
case Some(`max`) data // unchanged
case _ data + (node -> max)
}
}
case c @ Changed(MaxHeapKey)
maxHeap = c.get(MaxHeapKey).entries
case c @ Changed(UsedHeapKey)
val usedHeapPercent = UsedHeap(c.get(UsedHeapKey).entries.collect {
case (key, value) if maxHeap.contains(key)
(key -> (value.toDouble / maxHeap(key)) * 100.0)
})
log.debug("Node {} observed:\n{}", node, usedHeapPercent)
context.system.eventStream.publish(usedHeapPercent)
case _: UpdateResponse[_] // ok
case MemberUp(m)
nodesInCluster += nodeKey(m.address)
case MemberRemoved(m, _)
nodesInCluster -= nodeKey(m.address)
case Cleanup
def cleanupRemoved(data: LWWMap[Long]): LWWMap[Long] =
(data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) d - key }
replicator ! Update(UsedHeapKey, LWWMap.empty[Long], WriteLocal)(cleanupRemoved)
replicator ! Update(MaxHeapKey, LWWMap.empty[Long], WriteLocal)(cleanupRemoved)
}
}
class ReplicatedMetricsSpecMultiJvmNode1 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpecMultiJvmNode2 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpecMultiJvmNode3 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedMetricsSpec._
import ReplicatedMetrics._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val replicatedMetrics = system.actorOf(ReplicatedMetrics.props(1.second, 3.seconds))
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated metrics" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate metrics" in within(10.seconds) {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
}
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
enterBarrier("after-2")
}
"cleanup removed node" in within(15.seconds) {
val node3Address = node(node3).address
runOn(node1) {
cluster.leave(node3Address)
}
runOn(node1, node2) {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap].percentPerNode.size should be(2)
}
probe.expectMsgType[UsedHeap].percentPerNode should not contain (
nodeKey(node3Address))
}
enterBarrier("after-3")
}
}
}

View file

@ -1,267 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.datareplication
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent
import akka.cluster.ClusterEvent.LeaderChanged
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.GSet
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.cluster.ddata.STMultiNodeSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.GSetKey
import akka.cluster.ddata.ORSetKey
import akka.cluster.ddata.Key
object ReplicatedServiceRegistrySpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
class Service extends Actor {
def receive = {
case s: String sender() ! self.path.name + ": " + s
}
}
}
object ReplicatedServiceRegistry {
import akka.cluster.ddata.Replicator._
val props: Props = Props[ReplicatedServiceRegistry]
/**
* Register a `service` with a `name`. Several services
* can be registered with the same `name`.
* It will be removed when it is terminated.
*/
final case class Register(name: String, service: ActorRef)
/**
* Lookup services registered for a `name`. [[Bindings]] will
* be sent to `sender()`.
*/
final case class Lookup(name: String)
/**
* Reply for [[Lookup]]
*/
final case class Bindings(name: String, services: Set[ActorRef])
/**
* Published to `System.eventStream` when services are changed.
*/
final case class BindingChanged(name: String, services: Set[ActorRef])
final case class ServiceKey(serviceName: String) extends Key[ORSet[ActorRef]](serviceName)
private val AllServicesKey = GSetKey[ServiceKey]("service-keys")
}
class ReplicatedServiceRegistry() extends Actor with ActorLogging {
import akka.cluster.ddata.Replicator._
import ReplicatedServiceRegistry._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
var keys = Set.empty[ServiceKey]
var services = Map.empty[String, Set[ActorRef]]
var leader = false
def serviceKey(serviceName: String): ServiceKey =
ServiceKey("service:" + serviceName)
override def preStart(): Unit = {
replicator ! Subscribe(AllServicesKey, self)
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.LeaderChanged])
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
def receive = {
case Register(name, service)
val dKey = serviceKey(name)
// store the service names in a separate GSet to be able to
// get notifications of new names
if (!keys(dKey))
replicator ! Update(AllServicesKey, GSet(), WriteLocal)(_ + dKey)
// add the service
replicator ! Update(dKey, ORSet(), WriteLocal)(_ + service)
case Lookup(key)
sender() ! Bindings(key, services.getOrElse(key, Set.empty))
case c @ Changed(AllServicesKey)
val newKeys = c.get(AllServicesKey).elements
log.debug("Services changed, added: {}, all: {}", (newKeys -- keys), newKeys)
(newKeys -- keys).foreach { dKey
// subscribe to get notifications of when services with this name are added or removed
replicator ! Subscribe(dKey, self)
}
keys = newKeys
case c @ Changed(ServiceKey(serviceName))
val name = serviceName.split(":").tail.mkString
val newServices = c.get(serviceKey(name)).elements
log.debug("Services changed for name [{}]: {}", name, newServices)
services = services.updated(name, newServices)
context.system.eventStream.publish(BindingChanged(name, newServices))
if (leader)
newServices.foreach(context.watch) // watch is idempotent
case LeaderChanged(node)
// Let one node (the leader) be responsible for removal of terminated services
// to avoid redundant work and too many death watch notifications.
// It is not critical to only do it from one node.
val wasLeader = leader
leader = node.exists(_ == cluster.selfAddress)
// when used with many (> 500) services you must increase the system message buffer
// `akka.remote.system-message-buffer-size`
if (!wasLeader && leader)
for (refs services.valuesIterator; ref refs)
context.watch(ref)
else if (wasLeader && !leader)
for (refs services.valuesIterator; ref refs)
context.unwatch(ref)
case Terminated(ref)
val names = services.collect { case (name, refs) if refs.contains(ref) name }
names.foreach { name
log.debug("Service with name [{}] terminated: {}", name, ref)
replicator ! Update(serviceKey(name), ORSet(), WriteLocal)(_ - ref)
}
case _: UpdateResponse[_] // ok
}
}
class ReplicatedServiceRegistrySpecMultiJvmNode1 extends ReplicatedServiceRegistrySpec
class ReplicatedServiceRegistrySpecMultiJvmNode2 extends ReplicatedServiceRegistrySpec
class ReplicatedServiceRegistrySpecMultiJvmNode3 extends ReplicatedServiceRegistrySpec
class ReplicatedServiceRegistrySpec extends MultiNodeSpec(ReplicatedServiceRegistrySpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedServiceRegistrySpec._
import ReplicatedServiceRegistry._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val registry = system.actorOf(ReplicatedServiceRegistry.props)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated service registry" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate service entry" in within(10.seconds) {
runOn(node1) {
val a1 = system.actorOf(Props[Service], name = "a1")
registry ! Register("a", a1)
}
awaitAssert {
val probe = TestProbe()
registry.tell(Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1"))
}
enterBarrier("after-2")
}
"replicate updated service entry, and publish to even bus" in {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[BindingChanged])
runOn(node2) {
val a2 = system.actorOf(Props[Service], name = "a2")
registry ! Register("a", a2)
}
probe.within(10.seconds) {
probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1", "a2"))
registry.tell(Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1", "a2"))
}
enterBarrier("after-4")
}
"remove terminated service" in {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[BindingChanged])
runOn(node2) {
registry.tell(Lookup("a"), probe.ref)
val a2 = probe.expectMsgType[Bindings].services.find(_.path.name == "a2").get
a2 ! PoisonPill
}
probe.within(10.seconds) {
probe.expectMsgType[BindingChanged].services.map(_.path.name) should be(Set("a1"))
registry.tell(Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.map(_.path.name) should be(Set("a1"))
}
enterBarrier("after-5")
}
"replicate many service entries" in within(10.seconds) {
for (i 100 until 200) {
val service = system.actorOf(Props[Service], name = myself.name + "_" + i)
registry ! Register("a" + i, service)
}
awaitAssert {
val probe = TestProbe()
for (i 100 until 200) {
registry.tell(Lookup("a" + i), probe.ref)
probe.expectMsgType[Bindings].services.map(_.path.name) should be(roles.map(_.name + "_" + i).toSet)
}
}
enterBarrier("after-6")
}
}
}

View file

@ -1,214 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.datareplication
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.cluster.ddata.STMultiNodeSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.LWWMapKey
object ReplicatedShoppingCartSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
object ShoppingCart {
import akka.cluster.ddata.Replicator._
def props(userId: String): Props = Props(new ShoppingCart(userId))
case object GetCart
final case class AddItem(item: LineItem)
final case class RemoveItem(productId: String)
final case class Cart(items: Set[LineItem])
final case class LineItem(productId: String, title: String, quantity: Int)
//#read-write-majority
private val timeout = 3.seconds
private val readMajority = ReadMajority(timeout)
private val writeMajority = WriteMajority(timeout)
//#read-write-majority
}
class ShoppingCart(userId: String) extends Actor {
import ShoppingCart._
import akka.cluster.ddata.Replicator._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
val DataKey = LWWMapKey[LineItem]("cart-" + userId)
def receive = receiveGetCart
.orElse[Any, Unit](receiveAddItem)
.orElse[Any, Unit](receiveRemoveItem)
.orElse[Any, Unit](receiveOther)
//#get-cart
def receiveGetCart: Receive = {
case GetCart
replicator ! Get(DataKey, readMajority, Some(sender()))
case g @ GetSuccess(DataKey, Some(replyTo: ActorRef))
val data = g.get(DataKey)
val cart = Cart(data.entries.values.toSet)
replyTo ! cart
case NotFound(DataKey, Some(replyTo: ActorRef))
replyTo ! Cart(Set.empty)
case GetFailure(DataKey, Some(replyTo: ActorRef))
// ReadMajority failure, try again with local read
replicator ! Get(DataKey, ReadLocal, Some(replyTo))
}
//#get-cart
//#add-item
def receiveAddItem: Receive = {
case cmd @ AddItem(item)
val update = Update(DataKey, LWWMap.empty[LineItem], writeMajority, Some(cmd)) {
cart updateCart(cart, item)
}
replicator ! update
case GetFailure(DataKey, Some(AddItem(item)))
// ReadMajority of Update failed, fall back to best effort local value
replicator ! Update(DataKey, LWWMap.empty[LineItem], writeMajority, None) {
cart updateCart(cart, item)
}
}
//#add-item
//#remove-item
def receiveRemoveItem: Receive = {
case cmd @ RemoveItem(productId)
// Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it.
replicator ! Get(DataKey, readMajority, Some(cmd))
case GetSuccess(DataKey, Some(RemoveItem(productId)))
replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
_ - productId
}
case GetFailure(DataKey, Some(RemoveItem(productId)))
// ReadMajority failed, fall back to best effort local value
replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
_ - productId
}
case NotFound(DataKey, Some(RemoveItem(productId)))
// nothing to remove
}
//#remove-item
def receiveOther: Receive = {
case _: UpdateSuccess[_] | _: UpdateTimeout[_]
// UpdateTimeout, will eventually be replicated
case e: UpdateFailure[_] throw new IllegalStateException("Unexpected failure: " + e)
}
def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] =
data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity))
data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
case None data + (item.productId -> item)
}
}
class ReplicatedShoppingCartSpecMultiJvmNode1 extends ReplicatedShoppingCartSpec
class ReplicatedShoppingCartSpecMultiJvmNode2 extends ReplicatedShoppingCartSpec
class ReplicatedShoppingCartSpecMultiJvmNode3 extends ReplicatedShoppingCartSpec
class ReplicatedShoppingCartSpec extends MultiNodeSpec(ReplicatedShoppingCartSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedShoppingCartSpec._
import ShoppingCart._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val shoppingCart = system.actorOf(ShoppingCart.props("user-1"))
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated shopping cart" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"handle updates directly after start" in within(15.seconds) {
runOn(node2) {
shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 2))
shoppingCart ! ShoppingCart.AddItem(LineItem("2", "Oranges", quantity = 3))
}
enterBarrier("updates-done")
awaitAssert {
shoppingCart ! ShoppingCart.GetCart
val cart = expectMsgType[Cart]
cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3)))
}
enterBarrier("after-2")
}
"handle updates from different nodes" in within(5.seconds) {
runOn(node2) {
shoppingCart ! ShoppingCart.AddItem(LineItem("1", "Apples", quantity = 5))
shoppingCart ! ShoppingCart.RemoveItem("2")
}
runOn(node3) {
shoppingCart ! ShoppingCart.AddItem(LineItem("3", "Bananas", quantity = 4))
}
enterBarrier("updates-done")
awaitAssert {
shoppingCart ! ShoppingCart.GetCart
val cart = expectMsgType[Cart]
cart.items should be(Set(LineItem("1", "Apples", quantity = 7), LineItem("3", "Bananas", quantity = 4)))
}
enterBarrier("after-3")
}
}
}

View file

@ -1,184 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.datareplication
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Flag
import akka.cluster.ddata.PNCounterMap
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.cluster.ddata.STMultiNodeSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.FlagKey
import akka.cluster.ddata.PNCounterMapKey
object VotingContestSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
object VotingService {
case object Open
case object OpenAck
case object Close
case object CloseAck
final case class Vote(participant: String)
case object GetVotes
final case class Votes(result: Map[String, BigInt], open: Boolean)
private final case class GetVotesReq(replyTo: ActorRef)
}
class VotingService extends Actor {
import akka.cluster.ddata.Replicator._
import VotingService._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
val OpenedKey = FlagKey("contestOpened")
val ClosedKey = FlagKey("contestClosed")
val CountersKey = PNCounterMapKey("contestCounters")
replicator ! Subscribe(OpenedKey, self)
def receive = {
case Open
replicator ! Update(OpenedKey, Flag(), WriteAll(5.seconds))(_.switchOn)
becomeOpen()
case c @ Changed(OpenedKey) if c.get(OpenedKey).enabled
becomeOpen()
case GetVotes
sender() ! Votes(Map.empty, open = false)
}
def becomeOpen(): Unit = {
replicator ! Unsubscribe(OpenedKey, self)
replicator ! Subscribe(ClosedKey, self)
context.become(open orElse getVotes(open = true))
}
def open: Receive = {
case v @ Vote(participant)
val update = Update(CountersKey, PNCounterMap(), WriteLocal, request = Some(v)) {
_.increment(participant, 1)
}
replicator ! update
case _: UpdateSuccess[_]
case Close
replicator ! Update(ClosedKey, Flag(), WriteAll(5.seconds))(_.switchOn)
context.become(getVotes(open = false))
case c @ Changed(ClosedKey) if c.get(ClosedKey).enabled
context.become(getVotes(open = false))
}
def getVotes(open: Boolean): Receive = {
case GetVotes
replicator ! Get(CountersKey, ReadAll(3.seconds), Some(GetVotesReq(sender())))
case g @ GetSuccess(CountersKey, Some(GetVotesReq(replyTo)))
val data = g.get(CountersKey)
replyTo ! Votes(data.entries, open)
case NotFound(CountersKey, Some(GetVotesReq(replyTo)))
replyTo ! Votes(Map.empty, open)
case _: GetFailure[_]
case _: UpdateSuccess[_]
}
}
class VotingContestSpecMultiJvmNode1 extends VotingContestSpec
class VotingContestSpecMultiJvmNode2 extends VotingContestSpec
class VotingContestSpecMultiJvmNode3 extends VotingContestSpec
class VotingContestSpec extends MultiNodeSpec(VotingContestSpec) with STMultiNodeSpec with ImplicitSender {
import VotingContestSpec._
override def initialParticipants = roles.size
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated voting" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"count votes correctly" in within(15.seconds) {
import VotingService._
val votingService = system.actorOf(Props[VotingService], "votingService")
val N = 1000
runOn(node1) {
votingService ! Open
for (n 1 to N) {
votingService ! Vote("#" + ((n % 20) + 1))
}
}
runOn(node2, node3) {
// wait for it to open
val p = TestProbe()
awaitAssert {
votingService.tell(GetVotes, p.ref)
p.expectMsgPF(3.seconds) { case Votes(_, true) true }
}
for (n 1 to N) {
votingService ! Vote("#" + ((n % 20) + 1))
}
}
enterBarrier("voting-done")
runOn(node3) {
votingService ! Close
}
val expected = (1 to 20).map(n "#" + n -> BigInt(3L * N / 20)).toMap
awaitAssert {
votingService ! GetVotes
expectMsg(3.seconds, Votes(expected, false))
}
enterBarrier("after-2")
}
}
}

View file

@ -1,7 +1,7 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.ddata.sample
package akka.cluster.ddata
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
@ -10,12 +10,19 @@ import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.Replicator.Changed
import akka.cluster.ddata.Replicator.GetKeyIds
import akka.cluster.ddata.Replicator.GetKeyIdsResult
import akka.cluster.ddata.Replicator.Subscribe
import akka.cluster.ddata.Replicator.Update
import akka.cluster.ddata.Replicator.UpdateResponse
import akka.cluster.ddata.Replicator.WriteLocal
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.Replicator
import akka.cluster.ddata.ORSetKey
/**
* This "sample" simulates lots of data entries, and can be used for
* optimizing replication (e.g. catch-up when adding more nodes).
*/
object LotsOfDataBot {
def main(args: Array[String]): Unit = {

View file

@ -1,98 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.ddata.sample
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.ORSet
import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.Replicator
import akka.cluster.ddata.ORSetKey
object DataBot {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}
def startup(ports: Seq[String]): Unit = {
ports.foreach { port
// Override the configuration of the port
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load(
ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
akka.cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
""")))
// Create an Akka system
val system = ActorSystem("ClusterSystem", config)
// Create an actor that handles cluster domain events
system.actorOf(Props[DataBot], name = "dataBot")
}
}
private case object Tick
}
class DataBot extends Actor with ActorLogging {
import DataBot._
import Replicator._
val replicator = DistributedData(context.system).replicator
implicit val node = Cluster(context.system)
import context.dispatcher
val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick)
val DataKey = ORSetKey[String]("key")
replicator ! Subscribe(DataKey, self)
def receive = {
case Tick
val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s)
replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s)
} else {
// remove
log.info("Removing: {}", s)
replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s)
}
case _: UpdateResponse[_] // ignore
case c @ Changed(DataKey)
log.info("Current elements: {}", c.get(DataKey).elements)
}
override def postStop(): Unit = tickTask.cancel()
}