Merge branch 'master' into 21648-Prefer_reachable_nodes_in_consistency-jgordijn

This commit is contained in:
Patrik Nordwall 2017-01-13 10:21:09 +01:00 committed by GitHub
commit a8f9ad4775
37 changed files with 2442 additions and 625 deletions

View file

@ -177,6 +177,8 @@ All documentation must abide by the following maxims:
All documentation is preferred to be in Lightbend's standard documentation format [reStructuredText](http://doc.akka.io/docs/akka/snapshot/dev/documentation.html) compiled using Lightbend's customized [Sphinx](http://sphinx.pocoo.org/) based documentation generation system, which among other things allows all code in the documentation to be externalized into compiled files and imported into the documentation. All documentation is preferred to be in Lightbend's standard documentation format [reStructuredText](http://doc.akka.io/docs/akka/snapshot/dev/documentation.html) compiled using Lightbend's customized [Sphinx](http://sphinx.pocoo.org/) based documentation generation system, which among other things allows all code in the documentation to be externalized into compiled files and imported into the documentation.
To learn about how to build the documentation locally see the Reference Docs section about this topic: [Build the documentation]( http://doc.akka.io/docs/akka/2.4/dev/documentation.html#Build_the_documentation).
For more info, or for a starting point for new projects, look at the [Lightbend Documentation Template project](https://github.com/typesafehub/doc-template). For more info, or for a starting point for new projects, look at the [Lightbend Documentation Template project](https://github.com/typesafehub/doc-template).
For larger projects that have invested a lot of time and resources into their current documentation and samples scheme (like for example Play), it is understandable that it will take some time to migrate to this new model. In these cases someone from the project needs to take the responsibility of manual QA and verifier for the documentation and samples. For larger projects that have invested a lot of time and resources into their current documentation and samples scheme (like for example Play), it is understandable that it will take some time to migrate to this new model. In these cases someone from the project needs to take the responsibility of manual QA and verifier for the documentation and samples.

View file

@ -592,7 +592,7 @@ trait Actor {
*/ */
def unhandled(message: Any): Unit = { def unhandled(message: Any): Unit = {
message match { message match {
case Terminated(dead) throw new DeathPactException(dead) case Terminated(dead) throw DeathPactException(dead)
case _ context.system.eventStream.publish(UnhandledMessage(message, sender(), self)) case _ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
} }
} }

View file

@ -812,12 +812,21 @@ object ClusterReceptionist {
*/ */
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
context.setReceiveTimeout(timeout) context.setReceiveTimeout(timeout)
private val isAsk = {
val pathElements = client.path.elements
pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
}
def receive = { def receive = {
case Ping // keep alive from client case Ping // keep alive from client
case ReceiveTimeout case ReceiveTimeout
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path) log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
context stop self context stop self
case msg client.tell(msg, Actor.noSender) case msg
client.tell(msg, Actor.noSender)
if (isAsk)
context stop self
} }
} }
} }

View file

@ -213,6 +213,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-2") enterBarrier("after-2")
} }
"work with ask" in within(10 seconds) {
runOn(client) {
import akka.pattern.ask
import system.dispatcher
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "ask-client")
implicit val timeout = Timeout(remaining)
val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true)
Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack")
system.stop(c)
}
runOn(fourth) {
expectMsg("hello-request")
}
enterBarrier("after-3")
}
"demonstrate usage" in within(15 seconds) { "demonstrate usage" in within(15 seconds) {
def host1 = first def host1 = first
def host2 = second def host2 = second
@ -261,7 +279,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
// strange, barriers fail without this sleep // strange, barriers fail without this sleep
Thread.sleep(1000) Thread.sleep(1000)
enterBarrier("after-3") enterBarrier("after-4")
} }
"report events" in within(15 seconds) { "report events" in within(15 seconds) {
@ -305,7 +323,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
} }
} }
enterBarrier("after-6") enterBarrier("after-5")
} }
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
@ -356,7 +374,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
} }
} }
} }
enterBarrier("after-4") enterBarrier("after-6")
} }
"re-establish connection to receptionist after partition" in within(30 seconds) { "re-establish connection to receptionist after partition" in within(30 seconds) {
@ -397,7 +415,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.stop(c) system.stop(c)
} }
enterBarrier("after-5") enterBarrier("after-7")
} }
"re-establish connection to receptionist after server restart" in within(30 seconds) { "re-establish connection to receptionist after server restart" in within(30 seconds) {
@ -436,8 +454,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.name, system.name,
ConfigFactory.parseString( ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port" if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port" else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config))
).withFallback(system.settings.config))
Cluster(sys2).join(Cluster(sys2).selfAddress) Cluster(sys2).join(Cluster(sys2).selfAddress)
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2") val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(sys2).registerService(service2) ClusterClientReceptionist(sys2).registerService(service2)

View file

@ -1209,6 +1209,10 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe
remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin } remainingSeedNodes foreach { a context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin }
} else { } else {
// no InitJoinAck received, initialize new cluster by joining myself // no InitJoinAck received, initialize new cluster by joining myself
if (log.isDebugEnabled)
log.debug(
"Couldn't join other seed nodes, will join myself. seed-nodes=[{}]",
seedNodes.mkString(", "))
context.parent ! JoinTo(selfAddress) context.parent ! JoinTo(selfAddress)
context.stop(self) context.stop(self)
} }
@ -1262,11 +1266,14 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout) context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout)
var attempt = 0
override def preStart(): Unit = self ! JoinSeedNode override def preStart(): Unit = self ! JoinSeedNode
def receive = { def receive = {
case JoinSeedNode case JoinSeedNode
// send InitJoin to all seed nodes (except myself) // send InitJoin to all seed nodes (except myself)
attempt += 1
seedNodes.collect { seedNodes.collect {
case a if a != selfAddress context.actorSelection(context.parent.path.toStringWithAddress(a)) case a if a != selfAddress context.actorSelection(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin } } foreach { _ ! InitJoin }
@ -1276,6 +1283,10 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
context.become(done) context.become(done)
case InitJoinNack(_) // that seed was uninitialized case InitJoinNack(_) // that seed was uninitialized
case ReceiveTimeout case ReceiveTimeout
if (attempt >= 2)
log.warning(
"Couldn't join seed nodes after [{}] attmpts, will try again. seed-nodes=[{}]",
attempt, seedNodes.filterNot(_ == selfAddress).mkString(", "))
// no InitJoinAck received, try again // no InitJoinAck received, try again
self ! JoinSeedNode self ! JoinSeedNode
} }

View file

@ -58,8 +58,11 @@ message VersionVector {
message ORMap { message ORMap {
message Entry { message Entry {
required string key = 1; optional string stringKey = 1;
required OtherMessage value = 2; required OtherMessage value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
} }
required ORSet keys = 1; required ORSet keys = 1;
@ -68,8 +71,11 @@ message ORMap {
message LWWMap { message LWWMap {
message Entry { message Entry {
required string key = 1; optional string stringKey = 1;
required LWWRegister value = 2; required LWWRegister value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
} }
required ORSet keys = 1; required ORSet keys = 1;
@ -78,8 +84,11 @@ message LWWMap {
message PNCounterMap { message PNCounterMap {
message Entry { message Entry {
required string key = 1; optional string stringKey = 1;
required PNCounter value = 2; required PNCounter value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
} }
required ORSet keys = 1; required ORSet keys = 1;
@ -88,8 +97,11 @@ message PNCounterMap {
message ORMultiMap { message ORMultiMap {
message Entry { message Entry {
required string key = 1; optional string stringKey = 1;
required ORSet value = 2; required ORSet value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
} }
required ORSet keys = 1; required ORSet keys = 1;

View file

@ -7,18 +7,18 @@ import akka.cluster.Cluster
import akka.cluster.UniqueAddress import akka.cluster.UniqueAddress
object LWWMap { object LWWMap {
private val _empty: LWWMap[Any] = new LWWMap(ORMap.empty) private val _empty: LWWMap[Any, Any] = new LWWMap(ORMap.empty)
def empty[A]: LWWMap[A] = _empty.asInstanceOf[LWWMap[A]] def empty[A, B]: LWWMap[A, B] = _empty.asInstanceOf[LWWMap[A, B]]
def apply(): LWWMap[Any] = _empty def apply(): LWWMap[Any, Any] = _empty
/** /**
* Java API * Java API
*/ */
def create[A](): LWWMap[A] = empty def create[A, B](): LWWMap[A, B] = empty
/** /**
* Extract the [[LWWMap#entries]]. * Extract the [[LWWMap#entries]].
*/ */
def unapply[A](m: LWWMap[A]): Option[Map[String, A]] = Some(m.entries) def unapply[A, B](m: LWWMap[A, B]): Option[Map[A, B]] = Some(m.entries)
} }
/** /**
@ -44,29 +44,29 @@ object LWWMap {
* This class is immutable, i.e. "modifying" methods return a new instance. * This class is immutable, i.e. "modifying" methods return a new instance.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final class LWWMap[A] private[akka] ( final class LWWMap[A, B] private[akka] (
private[akka] val underlying: ORMap[LWWRegister[A]]) private[akka] val underlying: ORMap[A, LWWRegister[B]])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
import LWWRegister.{ Clock, defaultClock } import LWWRegister.{ Clock, defaultClock }
type T = LWWMap[A] type T = LWWMap[A, B]
/** /**
* Scala API: All entries of the map. * Scala API: All entries of the map.
*/ */
def entries: Map[String, A] = underlying.entries.map { case (k, r) k r.value } def entries: Map[A, B] = underlying.entries.map { case (k, r) k r.value }
/** /**
* Java API: All entries of the map. * Java API: All entries of the map.
*/ */
def getEntries(): java.util.Map[String, A] = { def getEntries(): java.util.Map[A, B] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
entries.asJava entries.asJava
} }
def get(key: String): Option[A] = underlying.get(key).map(_.value) def get(key: A): Option[B] = underlying.get(key).map(_.value)
def contains(key: String): Boolean = underlying.contains(key) def contains(key: A): Boolean = underlying.contains(key)
def isEmpty: Boolean = underlying.isEmpty def isEmpty: Boolean = underlying.isEmpty
@ -75,7 +75,7 @@ final class LWWMap[A] private[akka] (
/** /**
* Adds an entry to the map * Adds an entry to the map
*/ */
def +(entry: (String, A))(implicit node: Cluster): LWWMap[A] = { def +(entry: (A, B))(implicit node: Cluster): LWWMap[A, B] = {
val (key, value) = entry val (key, value) = entry
put(node, key, value) put(node, key, value)
} }
@ -83,8 +83,8 @@ final class LWWMap[A] private[akka] (
/** /**
* Adds an entry to the map * Adds an entry to the map
*/ */
def put(node: Cluster, key: String, value: A): LWWMap[A] = def put(node: Cluster, key: A, value: B): LWWMap[A, B] =
put(node, key, value, defaultClock[A]) put(node, key, value, defaultClock[B])
/** /**
* Adds an entry to the map. * Adds an entry to the map.
@ -94,7 +94,7 @@ final class LWWMap[A] private[akka] (
* increasing version number from a database record that is used for optimistic * increasing version number from a database record that is used for optimistic
* concurrency control. * concurrency control.
*/ */
def put(node: Cluster, key: String, value: A, clock: Clock[A]): LWWMap[A] = def put(node: Cluster, key: A, value: B, clock: Clock[B]): LWWMap[A, B] =
put(node.selfUniqueAddress, key, value, clock) put(node.selfUniqueAddress, key, value, clock)
/** /**
@ -105,13 +105,13 @@ final class LWWMap[A] private[akka] (
* increasing version number from a database record that is used for optimistic * increasing version number from a database record that is used for optimistic
* concurrency control. * concurrency control.
*/ */
def put(key: String, value: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWMap[A] = def put(key: A, value: B)(implicit node: Cluster, clock: Clock[B] = defaultClock[B]): LWWMap[A, B] =
put(node, key, value, clock) put(node, key, value, clock)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def put(node: UniqueAddress, key: String, value: A, clock: Clock[A]): LWWMap[A] = { private[akka] def put(node: UniqueAddress, key: A, value: B, clock: Clock[B]): LWWMap[A, B] = {
val newRegister = underlying.get(key) match { val newRegister = underlying.get(key) match {
case Some(r) r.withValue(node, value, clock) case Some(r) r.withValue(node, value, clock)
case None LWWRegister(node, value, clock) case None LWWRegister(node, value, clock)
@ -124,32 +124,32 @@ final class LWWMap[A] private[akka] (
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def -(key: String)(implicit node: Cluster): LWWMap[A] = remove(node, key) def -(key: A)(implicit node: Cluster): LWWMap[A, B] = remove(node, key)
/** /**
* Removes an entry from the map. * Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def remove(node: Cluster, key: String): LWWMap[A] = def remove(node: Cluster, key: A): LWWMap[A, B] =
remove(node.selfUniqueAddress, key) remove(node.selfUniqueAddress, key)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def remove(node: UniqueAddress, key: String): LWWMap[A] = private[akka] def remove(node: UniqueAddress, key: A): LWWMap[A, B] =
new LWWMap(underlying.remove(node, key)) new LWWMap(underlying.remove(node, key))
override def merge(that: LWWMap[A]): LWWMap[A] = override def merge(that: LWWMap[A, B]): LWWMap[A, B] =
new LWWMap(underlying.merge(that.underlying)) new LWWMap(underlying.merge(that.underlying))
override def needPruningFrom(removedNode: UniqueAddress): Boolean = override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode) underlying.needPruningFrom(removedNode)
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): LWWMap[A] = override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): LWWMap[A, B] =
new LWWMap(underlying.prune(removedNode, collapseInto)) new LWWMap(underlying.prune(removedNode, collapseInto))
override def pruningCleanup(removedNode: UniqueAddress): LWWMap[A] = override def pruningCleanup(removedNode: UniqueAddress): LWWMap[A, B] =
new LWWMap(underlying.pruningCleanup(removedNode)) new LWWMap(underlying.pruningCleanup(removedNode))
// this class cannot be a `case class` because we need different `unapply` // this class cannot be a `case class` because we need different `unapply`
@ -157,16 +157,16 @@ final class LWWMap[A] private[akka] (
override def toString: String = s"LWW$entries" //e.g. LWWMap(a -> 1, b -> 2) override def toString: String = s"LWW$entries" //e.g. LWWMap(a -> 1, b -> 2)
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean = o match {
case other: LWWMap[_] underlying == other.underlying case other: LWWMap[_, _] underlying == other.underlying
case _ false case _ false
} }
override def hashCode: Int = underlying.hashCode override def hashCode: Int = underlying.hashCode
} }
object LWWMapKey { object LWWMapKey {
def create[A](id: String): Key[LWWMap[A]] = LWWMapKey(id) def create[A, B](id: String): Key[LWWMap[A, B]] = LWWMapKey(id)
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class LWWMapKey[A](_id: String) extends Key[LWWMap[A]](_id) with ReplicatedDataSerialization final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id) with ReplicatedDataSerialization

View file

@ -8,18 +8,18 @@ import akka.cluster.UniqueAddress
import akka.util.HashCode import akka.util.HashCode
object ORMap { object ORMap {
private val _empty: ORMap[ReplicatedData] = new ORMap(ORSet.empty, Map.empty) private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty)
def empty[A <: ReplicatedData]: ORMap[A] = _empty.asInstanceOf[ORMap[A]] def empty[A, B <: ReplicatedData]: ORMap[A, B] = _empty.asInstanceOf[ORMap[A, B]]
def apply(): ORMap[ReplicatedData] = _empty def apply(): ORMap[Any, ReplicatedData] = _empty
/** /**
* Java API * Java API
*/ */
def create[A <: ReplicatedData](): ORMap[A] = empty[A] def create[A, B <: ReplicatedData](): ORMap[A, B] = empty[A, B]
/** /**
* Extract the [[ORMap#entries]]. * Extract the [[ORMap#entries]].
*/ */
def unapply[A <: ReplicatedData](m: ORMap[A]): Option[Map[String, A]] = Some(m.entries) def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries)
} }
@ -32,35 +32,35 @@ object ORMap {
* This class is immutable, i.e. "modifying" methods return a new instance. * This class is immutable, i.e. "modifying" methods return a new instance.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final class ORMap[A <: ReplicatedData] private[akka] ( final class ORMap[A, B <: ReplicatedData] private[akka] (
private[akka] val keys: ORSet[String], private[akka] val keys: ORSet[A],
private[akka] val values: Map[String, A]) private[akka] val values: Map[A, B])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
type T = ORMap[A] type T = ORMap[A, B]
/** /**
* Scala API: All entries of the map. * Scala API: All entries of the map.
*/ */
def entries: Map[String, A] = values def entries: Map[A, B] = values
/** /**
* Java API: All entries of the map. * Java API: All entries of the map.
*/ */
def getEntries(): java.util.Map[String, A] = { def getEntries(): java.util.Map[A, B] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
entries.asJava entries.asJava
} }
def get(key: String): Option[A] = values.get(key) def get(key: A): Option[B] = values.get(key)
/** /**
* Scala API: Get the value associated with the key if there is one, * Scala API: Get the value associated with the key if there is one,
* else return the given default. * else return the given default.
*/ */
def getOrElse(key: String, default: A): A = values.getOrElse(key, default) def getOrElse(key: A, default: B): B = values.getOrElse(key, default)
def contains(key: String): Boolean = values.contains(key) def contains(key: A): Boolean = values.contains(key)
def isEmpty: Boolean = values.isEmpty def isEmpty: Boolean = values.isEmpty
@ -70,7 +70,7 @@ final class ORMap[A <: ReplicatedData] private[akka] (
* Adds an entry to the map * Adds an entry to the map
* @see [[#put]] * @see [[#put]]
*/ */
def +(entry: (String, A))(implicit node: Cluster): ORMap[A] = { def +(entry: (A, B))(implicit node: Cluster): ORMap[A, B] = {
val (key, value) = entry val (key, value) = entry
put(node, key, value) put(node, key, value)
} }
@ -88,12 +88,12 @@ final class ORMap[A <: ReplicatedData] private[akka] (
* value, because important history can be lost when replacing the `ORSet` and * value, because important history can be lost when replacing the `ORSet` and
* undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead. * undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead.
*/ */
def put(node: Cluster, key: String, value: A): ORMap[A] = put(node.selfUniqueAddress, key, value) def put(node: Cluster, key: A, value: B): ORMap[A, B] = put(node.selfUniqueAddress, key, value)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def put(node: UniqueAddress, key: String, value: A): ORMap[A] = private[akka] def put(node: UniqueAddress, key: A, value: B): ORMap[A, B] =
if (value.isInstanceOf[ORSet[_]] && values.contains(key)) if (value.isInstanceOf[ORSet[_]] && values.contains(key))
throw new IllegalArgumentException( throw new IllegalArgumentException(
"`ORMap.put` must not be used to replace an existing `ORSet` " + "`ORMap.put` must not be used to replace an existing `ORSet` " +
@ -108,7 +108,7 @@ final class ORMap[A <: ReplicatedData] private[akka] (
* If there is no current value for the `key` the `initial` value will be * If there is no current value for the `key` the `initial` value will be
* passed to the `modify` function. * passed to the `modify` function.
*/ */
def updated(node: Cluster, key: String, initial: A)(modify: A A): ORMap[A] = def updated(node: Cluster, key: A, initial: B)(modify: B B): ORMap[A, B] =
updated(node.selfUniqueAddress, key, initial)(modify) updated(node.selfUniqueAddress, key, initial)(modify)
/** /**
@ -117,13 +117,13 @@ final class ORMap[A <: ReplicatedData] private[akka] (
* If there is no current value for the `key` the `initial` value will be * If there is no current value for the `key` the `initial` value will be
* passed to the `modify` function. * passed to the `modify` function.
*/ */
def updated(node: Cluster, key: String, initial: A, modify: java.util.function.Function[A, A]): ORMap[A] = def updated(node: Cluster, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] =
updated(node, key, initial)(value modify.apply(value)) updated(node, key, initial)(value modify.apply(value))
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def updated(node: UniqueAddress, key: String, initial: A)(modify: A A): ORMap[A] = { private[akka] def updated(node: UniqueAddress, key: A, initial: B)(modify: B B): ORMap[A, B] = {
val newValue = values.get(key) match { val newValue = values.get(key) match {
case Some(old) modify(old) case Some(old) modify(old)
case _ modify(initial) case _ modify(initial)
@ -136,25 +136,25 @@ final class ORMap[A <: ReplicatedData] private[akka] (
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def -(key: String)(implicit node: Cluster): ORMap[A] = remove(node, key) def -(key: A)(implicit node: Cluster): ORMap[A, B] = remove(node, key)
/** /**
* Removes an entry from the map. * Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def remove(node: Cluster, key: String): ORMap[A] = remove(node.selfUniqueAddress, key) def remove(node: Cluster, key: A): ORMap[A, B] = remove(node.selfUniqueAddress, key)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def remove(node: UniqueAddress, key: String): ORMap[A] = { private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
new ORMap(keys.remove(node, key), values - key) new ORMap(keys.remove(node, key), values - key)
} }
override def merge(that: ORMap[A]): ORMap[A] = { override def merge(that: ORMap[A, B]): ORMap[A, B] = {
val mergedKeys = keys.merge(that.keys) val mergedKeys = keys.merge(that.keys)
var mergedValues = Map.empty[String, A] var mergedValues = Map.empty[A, B]
mergedKeys.elementsMap.keysIterator.foreach { key mergedKeys.elementsMap.keysIterator.foreach { key
(this.values.get(key), that.values.get(key)) match { (this.values.get(key), that.values.get(key)) match {
case (Some(thisValue), Some(thatValue)) case (Some(thisValue), Some(thatValue))
@ -164,7 +164,7 @@ final class ORMap[A <: ReplicatedData] private[akka] (
throw new IllegalArgumentException(errMsg) throw new IllegalArgumentException(errMsg)
} }
// TODO can we get rid of these (safe) casts? // TODO can we get rid of these (safe) casts?
val mergedValue = thisValue.merge(thatValue.asInstanceOf[thisValue.T]).asInstanceOf[A] val mergedValue = thisValue.merge(thatValue.asInstanceOf[thisValue.T]).asInstanceOf[B]
mergedValues = mergedValues.updated(key, mergedValue) mergedValues = mergedValues.updated(key, mergedValue)
case (Some(thisValue), None) case (Some(thisValue), None)
mergedValues = mergedValues.updated(key, thisValue) mergedValues = mergedValues.updated(key, thisValue)
@ -184,21 +184,21 @@ final class ORMap[A <: ReplicatedData] private[akka] (
} }
} }
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): ORMap[A] = { override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): ORMap[A, B] = {
val prunedKeys = keys.prune(removedNode, collapseInto) val prunedKeys = keys.prune(removedNode, collapseInto)
val prunedValues = values.foldLeft(values) { val prunedValues = values.foldLeft(values) {
case (acc, (key, data: RemovedNodePruning)) if data.needPruningFrom(removedNode) case (acc, (key, data: RemovedNodePruning)) if data.needPruningFrom(removedNode)
acc.updated(key, data.prune(removedNode, collapseInto).asInstanceOf[A]) acc.updated(key, data.prune(removedNode, collapseInto).asInstanceOf[B])
case (acc, _) acc case (acc, _) acc
} }
new ORMap(prunedKeys, prunedValues) new ORMap(prunedKeys, prunedValues)
} }
override def pruningCleanup(removedNode: UniqueAddress): ORMap[A] = { override def pruningCleanup(removedNode: UniqueAddress): ORMap[A, B] = {
val pruningCleanupedKeys = keys.pruningCleanup(removedNode) val pruningCleanupedKeys = keys.pruningCleanup(removedNode)
val pruningCleanupedValues = values.foldLeft(values) { val pruningCleanupedValues = values.foldLeft(values) {
case (acc, (key, data: RemovedNodePruning)) if data.needPruningFrom(removedNode) case (acc, (key, data: RemovedNodePruning)) if data.needPruningFrom(removedNode)
acc.updated(key, data.pruningCleanup(removedNode).asInstanceOf[A]) acc.updated(key, data.pruningCleanup(removedNode).asInstanceOf[B])
case (acc, _) acc case (acc, _) acc
} }
new ORMap(pruningCleanupedKeys, pruningCleanupedValues) new ORMap(pruningCleanupedKeys, pruningCleanupedValues)
@ -209,8 +209,8 @@ final class ORMap[A <: ReplicatedData] private[akka] (
override def toString: String = s"OR$entries" override def toString: String = s"OR$entries"
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean = o match {
case other: ORMap[_] keys == other.keys && values == other.values case other: ORMap[_, _] keys == other.keys && values == other.values
case _ false case _ false
} }
override def hashCode: Int = { override def hashCode: Int = {
@ -223,8 +223,8 @@ final class ORMap[A <: ReplicatedData] private[akka] (
} }
object ORMapKey { object ORMapKey {
def create[A <: ReplicatedData](id: String): Key[ORMap[A]] = ORMapKey(id) def create[A, B <: ReplicatedData](id: String): Key[ORMap[A, B]] = ORMapKey(id)
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class ORMapKey[A <: ReplicatedData](_id: String) extends Key[ORMap[A]](_id) with ReplicatedDataSerialization final case class ORMapKey[A, B <: ReplicatedData](_id: String) extends Key[ORMap[A, B]](_id) with ReplicatedDataSerialization

View file

@ -7,29 +7,29 @@ import akka.cluster.{ UniqueAddress, Cluster }
object ORMultiMap { object ORMultiMap {
val _empty: ORMultiMap[Any] = new ORMultiMap(ORMap.empty) val _empty: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.empty)
/** /**
* Provides an empty multimap. * Provides an empty multimap.
*/ */
def empty[A]: ORMultiMap[A] = _empty.asInstanceOf[ORMultiMap[A]] def empty[A, B]: ORMultiMap[A, B] = _empty.asInstanceOf[ORMultiMap[A, B]]
def apply(): ORMultiMap[Any] = _empty def apply(): ORMultiMap[Any, Any] = _empty
/** /**
* Java API * Java API
*/ */
def create[A](): ORMultiMap[A] = empty[A] def create[A, B](): ORMultiMap[A, B] = empty[A, B]
/** /**
* Extract the [[ORMultiMap#entries]]. * Extract the [[ORMultiMap#entries]].
*/ */
def unapply[A](m: ORMultiMap[A]): Option[Map[String, Set[A]]] = Some(m.entries) def unapply[A, B](m: ORMultiMap[A, B]): Option[Map[A, Set[B]]] = Some(m.entries)
/** /**
* Extract the [[ORMultiMap#entries]] of an `ORMultiMap`. * Extract the [[ORMultiMap#entries]] of an `ORMultiMap`.
*/ */
def unapply(value: Any): Option[Map[String, Set[Any]]] = value match { def unapply[A, B <: ReplicatedData](value: Any): Option[Map[A, Set[B]]] = value match {
case m: ORMultiMap[Any] @unchecked Some(m.entries) case m: ORMultiMap[A, B] @unchecked Some(m.entries)
case _ None case _ None
} }
} }
@ -40,10 +40,10 @@ object ORMultiMap {
* This class is immutable, i.e. "modifying" methods return a new instance. * This class is immutable, i.e. "modifying" methods return a new instance.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORSet[A]]) final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[A, ORSet[B]])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
override type T = ORMultiMap[A] override type T = ORMultiMap[A, B]
override def merge(that: T): T = override def merge(that: T): T =
new ORMultiMap(underlying.merge(that.underlying)) new ORMultiMap(underlying.merge(that.underlying))
@ -51,15 +51,15 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
/** /**
* Scala API: All entries of a multimap where keys are strings and values are sets. * Scala API: All entries of a multimap where keys are strings and values are sets.
*/ */
def entries: Map[String, Set[A]] = def entries: Map[A, Set[B]] =
underlying.entries.map { case (k, v) k v.elements } underlying.entries.map { case (k, v) k v.elements }
/** /**
* Java API: All entries of a multimap where keys are strings and values are sets. * Java API: All entries of a multimap where keys are strings and values are sets.
*/ */
def getEntries(): java.util.Map[String, java.util.Set[A]] = { def getEntries(): java.util.Map[A, java.util.Set[B]] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
val result = new java.util.HashMap[String, java.util.Set[A]] val result = new java.util.HashMap[A, java.util.Set[B]]
underlying.entries.foreach { underlying.entries.foreach {
case (k, v) result.put(k, v.elements.asJava) case (k, v) result.put(k, v.elements.asJava)
} }
@ -69,17 +69,17 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
/** /**
* Get the set associated with the key if there is one. * Get the set associated with the key if there is one.
*/ */
def get(key: String): Option[Set[A]] = def get(key: A): Option[Set[B]] =
underlying.get(key).map(_.elements) underlying.get(key).map(_.elements)
/** /**
* Scala API: Get the set associated with the key if there is one, * Scala API: Get the set associated with the key if there is one,
* else return the given default. * else return the given default.
*/ */
def getOrElse(key: String, default: Set[A]): Set[A] = def getOrElse(key: A, default: Set[B]): Set[B] =
get(key).getOrElse(default) get(key).getOrElse(default)
def contains(key: String): Boolean = underlying.contains(key) def contains(key: A): Boolean = underlying.contains(key)
def isEmpty: Boolean = underlying.isEmpty def isEmpty: Boolean = underlying.isEmpty
@ -89,7 +89,7 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
* Convenience for put. Requires an implicit Cluster. * Convenience for put. Requires an implicit Cluster.
* @see [[#put]] * @see [[#put]]
*/ */
def +(entry: (String, Set[A]))(implicit node: Cluster): ORMultiMap[A] = { def +(entry: (A, Set[B]))(implicit node: Cluster): ORMultiMap[A, B] = {
val (key, value) = entry val (key, value) = entry
put(node, key, value) put(node, key, value)
} }
@ -98,14 +98,14 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
* Scala API: Associate an entire set with the key while retaining the history of the previous * Scala API: Associate an entire set with the key while retaining the history of the previous
* replicated data set. * replicated data set.
*/ */
def put(node: Cluster, key: String, value: Set[A]): ORMultiMap[A] = def put(node: Cluster, key: A, value: Set[B]): ORMultiMap[A, B] =
put(node.selfUniqueAddress, key, value) put(node.selfUniqueAddress, key, value)
/** /**
* Java API: Associate an entire set with the key while retaining the history of the previous * Java API: Associate an entire set with the key while retaining the history of the previous
* replicated data set. * replicated data set.
*/ */
def put(node: Cluster, key: String, value: java.util.Set[A]): ORMultiMap[A] = { def put(node: Cluster, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
put(node, key, value.asScala.toSet) put(node, key, value.asScala.toSet)
} }
@ -113,8 +113,8 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def put(node: UniqueAddress, key: String, value: Set[A]): ORMultiMap[A] = { private[akka] def put(node: UniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[A]) { existing val newUnderlying = underlying.updated(node, key, ORSet.empty[B]) { existing
value.foldLeft(existing.clear(node)) { (s, element) s.add(node, element) } value.foldLeft(existing.clear(node)) { (s, element) s.add(node, element) }
} }
new ORMultiMap(newUnderlying) new ORMultiMap(newUnderlying)
@ -124,38 +124,38 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
* Convenience for remove. Requires an implicit Cluster. * Convenience for remove. Requires an implicit Cluster.
* @see [[#remove]] * @see [[#remove]]
*/ */
def -(key: String)(implicit node: Cluster): ORMultiMap[A] = def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] =
remove(node, key) remove(node, key)
/** /**
* Remove an entire set associated with the key. * Remove an entire set associated with the key.
*/ */
def remove(node: Cluster, key: String): ORMultiMap[A] = def remove(node: Cluster, key: A): ORMultiMap[A, B] =
remove(node.selfUniqueAddress, key) remove(node.selfUniqueAddress, key)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def remove(node: UniqueAddress, key: String): ORMultiMap[A] = private[akka] def remove(node: UniqueAddress, key: A): ORMultiMap[A, B] =
new ORMultiMap(underlying.remove(node, key)) new ORMultiMap(underlying.remove(node, key))
/** /**
* Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. * Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
*/ */
def addBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] = def addBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
addBinding(node.selfUniqueAddress, key, element) addBinding(node.selfUniqueAddress, key, element)
/** /**
* Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. * Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
*/ */
def addBinding(node: Cluster, key: String, element: A): ORMultiMap[A] = def addBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
addBinding(key, element)(node) addBinding(key, element)(node)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def addBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = { private[akka] def addBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[A])(_.add(node, element)) val newUnderlying = underlying.updated(node, key, ORSet.empty[B])(_.add(node, element))
new ORMultiMap(newUnderlying) new ORMultiMap(newUnderlying)
} }
@ -163,22 +163,22 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
* Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the * Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed. * entire set will be removed.
*/ */
def removeBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] = def removeBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
removeBinding(node.selfUniqueAddress, key, element) removeBinding(node.selfUniqueAddress, key, element)
/** /**
* Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the * Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed. * entire set will be removed.
*/ */
def removeBinding(node: Cluster, key: String, element: A): ORMultiMap[A] = def removeBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
removeBinding(key, element)(node) removeBinding(key, element)(node)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def removeBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = { private[akka] def removeBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
val newUnderlying = { val newUnderlying = {
val u = underlying.updated(node, key, ORSet.empty[A])(_.remove(node, element)) val u = underlying.updated(node, key, ORSet.empty[B])(_.remove(node, element))
u.get(key) match { u.get(key) match {
case Some(s) if s.isEmpty u.remove(node, key) case Some(s) if s.isEmpty u.remove(node, key)
case _ u case _ u
@ -192,13 +192,13 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
* and another one is added within the same Update. The order of addition and removal is important in order * and another one is added within the same Update. The order of addition and removal is important in order
* to retain history for replicated data. * to retain history for replicated data.
*/ */
def replaceBinding(key: String, oldElement: A, newElement: A)(implicit node: Cluster): ORMultiMap[A] = def replaceBinding(key: A, oldElement: B, newElement: B)(implicit node: Cluster): ORMultiMap[A, B] =
replaceBinding(node.selfUniqueAddress, key, oldElement, newElement) replaceBinding(node.selfUniqueAddress, key, oldElement, newElement)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def replaceBinding(node: UniqueAddress, key: String, oldElement: A, newElement: A): ORMultiMap[A] = private[akka] def replaceBinding(node: UniqueAddress, key: A, oldElement: B, newElement: B): ORMultiMap[A, B] =
if (newElement != oldElement) if (newElement != oldElement)
addBinding(node, key, newElement).removeBinding(node, key, oldElement) addBinding(node, key, newElement).removeBinding(node, key, oldElement)
else else
@ -218,16 +218,16 @@ final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORS
override def toString: String = s"ORMulti$entries" override def toString: String = s"ORMulti$entries"
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean = o match {
case other: ORMultiMap[_] underlying == other.underlying case other: ORMultiMap[_, _] underlying == other.underlying
case _ false case _ false
} }
override def hashCode: Int = underlying.hashCode override def hashCode: Int = underlying.hashCode
} }
object ORMultiMapKey { object ORMultiMapKey {
def create[A](id: String): Key[ORMultiMap[A]] = ORMultiMapKey(id) def create[A, B](id: String): Key[ORMultiMap[A, B]] = ORMultiMapKey(id)
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class ORMultiMapKey[A](_id: String) extends Key[ORMultiMap[A]](_id) with ReplicatedDataSerialization final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, B]](_id) with ReplicatedDataSerialization

View file

@ -8,17 +8,17 @@ import akka.cluster.UniqueAddress
import java.math.BigInteger import java.math.BigInteger
object PNCounterMap { object PNCounterMap {
val empty: PNCounterMap = new PNCounterMap(ORMap.empty) def empty[A]: PNCounterMap[A] = new PNCounterMap(ORMap.empty)
def apply(): PNCounterMap = empty def apply[A](): PNCounterMap[A] = empty
/** /**
* Java API * Java API
*/ */
def create(): PNCounterMap = empty def create[A](): PNCounterMap[A] = empty
/** /**
* Extract the [[PNCounterMap#entries]]. * Extract the [[PNCounterMap#entries]].
*/ */
def unapply(m: PNCounterMap): Option[Map[String, BigInt]] = Some(m.entries) def unapply[A](m: PNCounterMap[A]): Option[Map[A, BigInt]] = Some(m.entries)
} }
/** /**
@ -27,17 +27,17 @@ object PNCounterMap {
* This class is immutable, i.e. "modifying" methods return a new instance. * This class is immutable, i.e. "modifying" methods return a new instance.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final class PNCounterMap private[akka] ( final class PNCounterMap[A] private[akka] (
private[akka] val underlying: ORMap[PNCounter]) private[akka] val underlying: ORMap[A, PNCounter])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
type T = PNCounterMap type T = PNCounterMap[A]
/** Scala API */ /** Scala API */
def entries: Map[String, BigInt] = underlying.entries.map { case (k, c) k c.value } def entries: Map[A, BigInt] = underlying.entries.map { case (k, c) k c.value }
/** Java API */ /** Java API */
def getEntries: java.util.Map[String, BigInteger] = { def getEntries: java.util.Map[A, BigInteger] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
underlying.entries.map { case (k, c) k c.value.bigInteger }.asJava underlying.entries.map { case (k, c) k c.value.bigInteger }.asJava
} }
@ -45,14 +45,14 @@ final class PNCounterMap private[akka] (
/** /**
* Scala API: The count for a key * Scala API: The count for a key
*/ */
def get(key: String): Option[BigInt] = underlying.get(key).map(_.value) def get(key: A): Option[BigInt] = underlying.get(key).map(_.value)
/** /**
* Java API: The count for a key, or `null` if it doesn't exist * Java API: The count for a key, or `null` if it doesn't exist
*/ */
def getValue(key: String): BigInteger = underlying.get(key).map(_.value.bigInteger).orNull def getValue(key: A): BigInteger = underlying.get(key).map(_.value.bigInteger).orNull
def contains(key: String): Boolean = underlying.contains(key) def contains(key: A): Boolean = underlying.contains(key)
def isEmpty: Boolean = underlying.isEmpty def isEmpty: Boolean = underlying.isEmpty
@ -62,40 +62,40 @@ final class PNCounterMap private[akka] (
* Increment the counter with the delta specified. * Increment the counter with the delta specified.
* If the delta is negative then it will decrement instead of increment. * If the delta is negative then it will decrement instead of increment.
*/ */
def increment(key: String, delta: Long = 1)(implicit node: Cluster): PNCounterMap = def increment(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
increment(node, key, delta) increment(node, key, delta)
/** /**
* Increment the counter with the delta specified. * Increment the counter with the delta specified.
* If the delta is negative then it will decrement instead of increment. * If the delta is negative then it will decrement instead of increment.
*/ */
def increment(node: Cluster, key: String, delta: Long): PNCounterMap = def increment(node: Cluster, key: A, delta: Long): PNCounterMap[A] =
increment(node.selfUniqueAddress, key, delta) increment(node.selfUniqueAddress, key, delta)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def increment(node: UniqueAddress, key: String, delta: Long): PNCounterMap = private[akka] def increment(node: UniqueAddress, key: A, delta: Long): PNCounterMap[A] =
new PNCounterMap(underlying.updated(node, key, PNCounter())(_.increment(node, delta))) new PNCounterMap(underlying.updated(node, key, PNCounter())(_.increment(node, delta)))
/** /**
* Decrement the counter with the delta specified. * Decrement the counter with the delta specified.
* If the delta is negative then it will increment instead of decrement. * If the delta is negative then it will increment instead of decrement.
*/ */
def decrement(key: String, delta: Long = 1)(implicit node: Cluster): PNCounterMap = def decrement(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
decrement(node, key, delta) decrement(node, key, delta)
/** /**
* Decrement the counter with the delta specified. * Decrement the counter with the delta specified.
* If the delta is negative then it will increment instead of decrement. * If the delta is negative then it will increment instead of decrement.
*/ */
def decrement(node: Cluster, key: String, delta: Long): PNCounterMap = def decrement(node: Cluster, key: A, delta: Long): PNCounterMap[A] =
decrement(node.selfUniqueAddress, key, delta) decrement(node.selfUniqueAddress, key, delta)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def decrement(node: UniqueAddress, key: String, delta: Long): PNCounterMap = { private[akka] def decrement(node: UniqueAddress, key: A, delta: Long): PNCounterMap[A] = {
new PNCounterMap(underlying.updated(node, key, PNCounter())(_.decrement(node, delta))) new PNCounterMap(underlying.updated(node, key, PNCounter())(_.decrement(node, delta)))
} }
@ -104,32 +104,32 @@ final class PNCounterMap private[akka] (
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def -(key: String)(implicit node: Cluster): PNCounterMap = remove(node, key) def -(key: A)(implicit node: Cluster): PNCounterMap[A] = remove(node, key)
/** /**
* Removes an entry from the map. * Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will * Note that if there is a conflicting update on another node the entry will
* not be removed after merge. * not be removed after merge.
*/ */
def remove(node: Cluster, key: String): PNCounterMap = def remove(node: Cluster, key: A): PNCounterMap[A] =
remove(node.selfUniqueAddress, key) remove(node.selfUniqueAddress, key)
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def remove(node: UniqueAddress, key: String): PNCounterMap = private[akka] def remove(node: UniqueAddress, key: A): PNCounterMap[A] =
new PNCounterMap(underlying.remove(node, key)) new PNCounterMap(underlying.remove(node, key))
override def merge(that: PNCounterMap): PNCounterMap = override def merge(that: PNCounterMap[A]): PNCounterMap[A] =
new PNCounterMap(underlying.merge(that.underlying)) new PNCounterMap(underlying.merge(that.underlying))
override def needPruningFrom(removedNode: UniqueAddress): Boolean = override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode) underlying.needPruningFrom(removedNode)
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): PNCounterMap = override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): PNCounterMap[A] =
new PNCounterMap(underlying.prune(removedNode, collapseInto)) new PNCounterMap(underlying.prune(removedNode, collapseInto))
override def pruningCleanup(removedNode: UniqueAddress): PNCounterMap = override def pruningCleanup(removedNode: UniqueAddress): PNCounterMap[A] =
new PNCounterMap(underlying.pruningCleanup(removedNode)) new PNCounterMap(underlying.pruningCleanup(removedNode))
// this class cannot be a `case class` because we need different `unapply` // this class cannot be a `case class` because we need different `unapply`
@ -137,16 +137,16 @@ final class PNCounterMap private[akka] (
override def toString: String = s"PNCounter$entries" override def toString: String = s"PNCounter$entries"
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean = o match {
case other: PNCounterMap underlying == other.underlying case other: PNCounterMap[A] underlying == other.underlying
case _ false case _ false
} }
override def hashCode: Int = underlying.hashCode override def hashCode: Int = underlying.hashCode
} }
object PNCounterMapKey { object PNCounterMapKey {
def create[A](id: String): Key[PNCounterMap] = PNCounterMapKey(id) def create[A](id: String): Key[PNCounterMap[A]] = PNCounterMapKey[A](id)
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class PNCounterMapKey(_id: String) extends Key[PNCounterMap](_id) with ReplicatedDataSerialization final case class PNCounterMapKey[A](_id: String) extends Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization

View file

@ -3,10 +3,11 @@
*/ */
package akka.cluster.ddata.protobuf package akka.cluster.ddata.protobuf
import java.{ lang jl } import java.{ util, lang jl }
import java.util.ArrayList import java.util.ArrayList
import java.util.Collections import java.util.Collections
import java.util.Comparator import java.util.Comparator
import java.util.TreeSet
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.breakOut import scala.collection.breakOut
@ -17,11 +18,137 @@ import akka.cluster.ddata.protobuf.msg.{ ReplicatedDataMessages ⇒ rd }
import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages dm } import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages dm }
import akka.serialization.SerializerWithStringManifest import akka.serialization.SerializerWithStringManifest
import akka.serialization.BaseSerializer import akka.serialization.BaseSerializer
import akka.protobuf.ByteString import akka.protobuf.{ ByteString, GeneratedMessage }
import akka.util.ByteString.UTF_8 import akka.util.ByteString.UTF_8
import scala.collection.immutable.TreeMap import scala.collection.immutable.TreeMap
import akka.cluster.UniqueAddress import akka.cluster.UniqueAddress
import java.io.NotSerializableException import java.io.NotSerializableException
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
private object ReplicatedDataSerializer {
/*
* Generic superclass to allow to compare Entry types used in protobuf.
*/
abstract class KeyComparator[A <: GeneratedMessage] extends Comparator[A] {
/**
* Get the key from the entry. The key may be a String, Integer, Long, or Any
* @param entry The protobuf entry used with Map types
* @return The Key
*/
def getKey(entry: A): Any
final def compare(x: A, y: A): Int = compareKeys(getKey(x), getKey(y))
private final def compareKeys(t1: Any, t2: Any): Int = (t1, t2) match {
case (k1: String, k2: String) k1.compareTo(k2)
case (k1: String, k2) -1
case (k1, k2: String) 1
case (k1: Int, k2: Int) k1.compareTo(k2)
case (k1: Int, k2) -1
case (k1, k2: Int) 1
case (k1: Long, k2: Long) k1.compareTo(k2)
case (k1: Long, k2) -1
case (k1, k2: Long) 1
case (k1: OtherMessage, k2: OtherMessage) OtherMessageComparator.compare(k1, k2)
}
}
implicit object ORMapEntryComparator extends KeyComparator[rd.ORMap.Entry] {
override def getKey(e: rd.ORMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
}
implicit object LWWMapEntryComparator extends KeyComparator[rd.LWWMap.Entry] {
override def getKey(e: rd.LWWMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
}
implicit object PNCounterMapEntryComparator extends KeyComparator[rd.PNCounterMap.Entry] {
override def getKey(e: rd.PNCounterMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
}
implicit object ORMultiMapEntryComparator extends KeyComparator[rd.ORMultiMap.Entry] {
override def getKey(e: rd.ORMultiMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
}
sealed trait ProtoMapEntryWriter[Entry <: GeneratedMessage, EntryBuilder <: GeneratedMessage.Builder[EntryBuilder], Value <: GeneratedMessage] {
def setStringKey(builder: EntryBuilder, key: String, value: Value): Entry
def setLongKey(builder: EntryBuilder, key: Long, value: Value): Entry
def setIntKey(builder: EntryBuilder, key: Int, value: Value): Entry
def setOtherKey(builder: EntryBuilder, key: dm.OtherMessage, value: Value): Entry
}
sealed trait ProtoMapEntryReader[Entry <: GeneratedMessage, A <: GeneratedMessage] {
def hasStringKey(entry: Entry): Boolean
def getStringKey(entry: Entry): String
def hasIntKey(entry: Entry): Boolean
def getIntKey(entry: Entry): Int
def hasLongKey(entry: Entry): Boolean
def getLongKey(entry: Entry): Long
def hasOtherKey(entry: Entry): Boolean
def getOtherKey(entry: Entry): dm.OtherMessage
def getValue(entry: Entry): A
}
implicit object ORMapEntry extends ProtoMapEntryWriter[rd.ORMap.Entry, rd.ORMap.Entry.Builder, dm.OtherMessage] with ProtoMapEntryReader[rd.ORMap.Entry, dm.OtherMessage] {
override def setStringKey(builder: rd.ORMap.Entry.Builder, key: String, value: dm.OtherMessage): rd.ORMap.Entry = builder.setStringKey(key).setValue(value).build()
override def setLongKey(builder: rd.ORMap.Entry.Builder, key: Long, value: dm.OtherMessage): rd.ORMap.Entry = builder.setLongKey(key).setValue(value).build()
override def setIntKey(builder: rd.ORMap.Entry.Builder, key: Int, value: dm.OtherMessage): rd.ORMap.Entry = builder.setIntKey(key).setValue(value).build()
override def setOtherKey(builder: rd.ORMap.Entry.Builder, key: dm.OtherMessage, value: dm.OtherMessage): rd.ORMap.Entry = builder.setOtherKey(key).setValue(value).build()
override def hasStringKey(entry: rd.ORMap.Entry): Boolean = entry.hasStringKey
override def getStringKey(entry: rd.ORMap.Entry): String = entry.getStringKey
override def hasIntKey(entry: rd.ORMap.Entry): Boolean = entry.hasIntKey
override def getIntKey(entry: rd.ORMap.Entry): Int = entry.getIntKey
override def hasLongKey(entry: rd.ORMap.Entry): Boolean = entry.hasLongKey
override def getLongKey(entry: rd.ORMap.Entry): Long = entry.getLongKey
override def hasOtherKey(entry: rd.ORMap.Entry): Boolean = entry.hasOtherKey
override def getOtherKey(entry: rd.ORMap.Entry): OtherMessage = entry.getOtherKey
override def getValue(entry: rd.ORMap.Entry): dm.OtherMessage = entry.getValue
}
implicit object LWWMapEntry extends ProtoMapEntryWriter[rd.LWWMap.Entry, rd.LWWMap.Entry.Builder, rd.LWWRegister] with ProtoMapEntryReader[rd.LWWMap.Entry, rd.LWWRegister] {
override def setStringKey(builder: rd.LWWMap.Entry.Builder, key: String, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setStringKey(key).setValue(value).build()
override def setLongKey(builder: rd.LWWMap.Entry.Builder, key: Long, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setLongKey(key).setValue(value).build()
override def setIntKey(builder: rd.LWWMap.Entry.Builder, key: Int, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setIntKey(key).setValue(value).build()
override def setOtherKey(builder: rd.LWWMap.Entry.Builder, key: OtherMessage, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setOtherKey(key).setValue(value).build()
override def hasStringKey(entry: rd.LWWMap.Entry): Boolean = entry.hasStringKey
override def getStringKey(entry: rd.LWWMap.Entry): String = entry.getStringKey
override def hasIntKey(entry: rd.LWWMap.Entry): Boolean = entry.hasIntKey
override def getIntKey(entry: rd.LWWMap.Entry): Int = entry.getIntKey
override def hasLongKey(entry: rd.LWWMap.Entry): Boolean = entry.hasLongKey
override def getLongKey(entry: rd.LWWMap.Entry): Long = entry.getLongKey
override def hasOtherKey(entry: rd.LWWMap.Entry): Boolean = entry.hasOtherKey
override def getOtherKey(entry: rd.LWWMap.Entry): OtherMessage = entry.getOtherKey
override def getValue(entry: rd.LWWMap.Entry): rd.LWWRegister = entry.getValue
}
implicit object PNCounterMapEntry extends ProtoMapEntryWriter[rd.PNCounterMap.Entry, rd.PNCounterMap.Entry.Builder, rd.PNCounter] with ProtoMapEntryReader[rd.PNCounterMap.Entry, rd.PNCounter] {
override def setStringKey(builder: rd.PNCounterMap.Entry.Builder, key: String, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setStringKey(key).setValue(value).build()
override def setLongKey(builder: rd.PNCounterMap.Entry.Builder, key: Long, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setLongKey(key).setValue(value).build()
override def setIntKey(builder: rd.PNCounterMap.Entry.Builder, key: Int, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setIntKey(key).setValue(value).build()
override def setOtherKey(builder: rd.PNCounterMap.Entry.Builder, key: OtherMessage, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setOtherKey(key).setValue(value).build()
override def hasStringKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasStringKey
override def getStringKey(entry: rd.PNCounterMap.Entry): String = entry.getStringKey
override def hasIntKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasIntKey
override def getIntKey(entry: rd.PNCounterMap.Entry): Int = entry.getIntKey
override def hasLongKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasLongKey
override def getLongKey(entry: rd.PNCounterMap.Entry): Long = entry.getLongKey
override def hasOtherKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasOtherKey
override def getOtherKey(entry: rd.PNCounterMap.Entry): OtherMessage = entry.getOtherKey
override def getValue(entry: rd.PNCounterMap.Entry): rd.PNCounter = entry.getValue
}
implicit object ORMultiMapEntry extends ProtoMapEntryWriter[rd.ORMultiMap.Entry, rd.ORMultiMap.Entry.Builder, rd.ORSet] with ProtoMapEntryReader[rd.ORMultiMap.Entry, rd.ORSet] {
override def setStringKey(builder: rd.ORMultiMap.Entry.Builder, key: String, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setStringKey(key).setValue(value).build()
override def setLongKey(builder: rd.ORMultiMap.Entry.Builder, key: Long, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setLongKey(key).setValue(value).build()
override def setIntKey(builder: rd.ORMultiMap.Entry.Builder, key: Int, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setIntKey(key).setValue(value).build()
override def setOtherKey(builder: rd.ORMultiMap.Entry.Builder, key: dm.OtherMessage, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setOtherKey(key).setValue(value).build()
override def hasStringKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasStringKey
override def getStringKey(entry: rd.ORMultiMap.Entry): String = entry.getStringKey
override def hasIntKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasIntKey
override def getIntKey(entry: rd.ORMultiMap.Entry): Int = entry.getIntKey
override def hasLongKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasLongKey
override def getLongKey(entry: rd.ORMultiMap.Entry): Long = entry.getLongKey
override def hasOtherKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasOtherKey
override def getOtherKey(entry: rd.ORMultiMap.Entry): OtherMessage = entry.getOtherKey
override def getValue(entry: rd.ORMultiMap.Entry): rd.ORSet = entry.getValue
}
}
/** /**
* Protobuf serializer of ReplicatedData. * Protobuf serializer of ReplicatedData.
@ -29,6 +156,8 @@ import java.io.NotSerializableException
class ReplicatedDataSerializer(val system: ExtendedActorSystem) class ReplicatedDataSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest with SerializationSupport with BaseSerializer { extends SerializerWithStringManifest with SerializationSupport with BaseSerializer {
import ReplicatedDataSerializer._
private val DeletedDataManifest = "A" private val DeletedDataManifest = "A"
private val GSetManifest = "B" private val GSetManifest = "B"
private val GSetKeyManifest = "b" private val GSetKeyManifest = "b"
@ -78,48 +207,48 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
ORMultiMapKeyManifest (bytes ORMultiMapKey(keyIdFromBinary(bytes)))) ORMultiMapKeyManifest (bytes ORMultiMapKey(keyIdFromBinary(bytes))))
override def manifest(obj: AnyRef): String = obj match { override def manifest(obj: AnyRef): String = obj match {
case _: ORSet[_] ORSetManifest case _: ORSet[_] ORSetManifest
case _: GSet[_] GSetManifest case _: GSet[_] GSetManifest
case _: GCounter GCounterManifest case _: GCounter GCounterManifest
case _: PNCounter PNCounterManifest case _: PNCounter PNCounterManifest
case _: Flag FlagManifest case _: Flag FlagManifest
case _: LWWRegister[_] LWWRegisterManifest case _: LWWRegister[_] LWWRegisterManifest
case _: ORMap[_] ORMapManifest case _: ORMap[_, _] ORMapManifest
case _: LWWMap[_] LWWMapManifest case _: LWWMap[_, _] LWWMapManifest
case _: PNCounterMap PNCounterMapManifest case _: PNCounterMap[_] PNCounterMapManifest
case _: ORMultiMap[_] ORMultiMapManifest case _: ORMultiMap[_, _] ORMultiMapManifest
case DeletedData DeletedDataManifest case DeletedData DeletedDataManifest
case _: VersionVector VersionVectorManifest case _: VersionVector VersionVectorManifest
case _: ORSetKey[_] ORSetKeyManifest case _: ORSetKey[_] ORSetKeyManifest
case _: GSetKey[_] GSetKeyManifest case _: GSetKey[_] GSetKeyManifest
case _: GCounterKey GCounterKeyManifest case _: GCounterKey GCounterKeyManifest
case _: PNCounterKey PNCounterKeyManifest case _: PNCounterKey PNCounterKeyManifest
case _: FlagKey FlagKeyManifest case _: FlagKey FlagKeyManifest
case _: LWWRegisterKey[_] LWWRegisterKeyManifest case _: LWWRegisterKey[_] LWWRegisterKeyManifest
case _: ORMapKey[_] ORMapKeyManifest case _: ORMapKey[_, _] ORMapKeyManifest
case _: LWWMapKey[_] LWWMapKeyManifest case _: LWWMapKey[_, _] LWWMapKeyManifest
case _: PNCounterMapKey PNCounterMapKeyManifest case _: PNCounterMapKey[_] PNCounterMapKeyManifest
case _: ORMultiMapKey[_] ORMultiMapKeyManifest case _: ORMultiMapKey[_, _] ORMultiMapKeyManifest
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: ORSet[_] compress(orsetToProto(m)) case m: ORSet[_] compress(orsetToProto(m))
case m: GSet[_] gsetToProto(m).toByteArray case m: GSet[_] gsetToProto(m).toByteArray
case m: GCounter gcounterToProto(m).toByteArray case m: GCounter gcounterToProto(m).toByteArray
case m: PNCounter pncounterToProto(m).toByteArray case m: PNCounter pncounterToProto(m).toByteArray
case m: Flag flagToProto(m).toByteArray case m: Flag flagToProto(m).toByteArray
case m: LWWRegister[_] lwwRegisterToProto(m).toByteArray case m: LWWRegister[_] lwwRegisterToProto(m).toByteArray
case m: ORMap[_] compress(ormapToProto(m)) case m: ORMap[_, _] compress(ormapToProto(m))
case m: LWWMap[_] compress(lwwmapToProto(m)) case m: LWWMap[_, _] compress(lwwmapToProto(m))
case m: PNCounterMap compress(pncountermapToProto(m)) case m: PNCounterMap[_] compress(pncountermapToProto(m))
case m: ORMultiMap[_] compress(multimapToProto(m)) case m: ORMultiMap[_, _] compress(multimapToProto(m))
case DeletedData dm.Empty.getDefaultInstance.toByteArray case DeletedData dm.Empty.getDefaultInstance.toByteArray
case m: VersionVector versionVectorToProto(m).toByteArray case m: VersionVector versionVectorToProto(m).toByteArray
case Key(id) keyIdToBinary(id) case Key(id) keyIdToBinary(id)
case _ case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
} }
@ -328,83 +457,88 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
} }
} }
def ormapToProto(ormap: ORMap[_]): rd.ORMap = { /*
val b = rd.ORMap.newBuilder().setKeys(orsetToProto(ormap.keys)) * Convert a Map[A, B] to an Iterable[Entry] where Entry is the protobuf map entry.
ormap.entries.toVector.sortBy { case (key, _) key }.foreach { */
case (key, value) b.addEntries(rd.ORMap.Entry.newBuilder(). private def getEntries[IKey, IValue, EntryBuilder <: GeneratedMessage.Builder[EntryBuilder], PEntry <: GeneratedMessage, PValue <: GeneratedMessage](input: Map[IKey, IValue], createBuilder: () EntryBuilder, valueConverter: IValue PValue)(implicit comparator: Comparator[PEntry], eh: ProtoMapEntryWriter[PEntry, EntryBuilder, PValue]): java.lang.Iterable[PEntry] = {
setKey(key).setValue(otherMessageToProto(value))) // The resulting Iterable needs to be ordered deterministically in order to create same signature upon serializing same data
val protoEntries = new TreeSet[PEntry](comparator)
input.foreach {
case (key: String, value) protoEntries.add(eh.setStringKey(createBuilder(), key, valueConverter(value)))
case (key: Int, value) protoEntries.add(eh.setIntKey(createBuilder(), key, valueConverter(value)))
case (key: Long, value) protoEntries.add(eh.setLongKey(createBuilder(), key, valueConverter(value)))
case (key, value) protoEntries.add(eh.setOtherKey(createBuilder(), otherMessageToProto(key), valueConverter(value)))
} }
b.build() protoEntries
} }
def ormapFromBinary(bytes: Array[Byte]): ORMap[ReplicatedData] = def ormapToProto(ormap: ORMap[_, _]): rd.ORMap = {
val entries: jl.Iterable[rd.ORMap.Entry] = getEntries(ormap.values, rd.ORMap.Entry.newBuilder, otherMessageToProto)
rd.ORMap.newBuilder().setKeys(orsetToProto(ormap.keys)).addAllEntries(entries).build()
}
def ormapFromBinary(bytes: Array[Byte]): ORMap[Any, ReplicatedData] =
ormapFromProto(rd.ORMap.parseFrom(decompress(bytes))) ormapFromProto(rd.ORMap.parseFrom(decompress(bytes)))
def ormapFromProto(ormap: rd.ORMap): ORMap[ReplicatedData] = { def mapTypeFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
val entries = ormap.getEntriesList.asScala.map(entry input.asScala.map { entry
entry.getKey otherMessageFromProto(entry.getValue).asInstanceOf[ReplicatedData]).toMap if (eh.hasStringKey(entry)) eh.getStringKey(entry) valueCreator(eh.getValue(entry))
else if (eh.hasIntKey(entry)) eh.getIntKey(entry) valueCreator(eh.getValue(entry))
else if (eh.hasLongKey(entry)) eh.getLongKey(entry) valueCreator(eh.getValue(entry))
else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry)) valueCreator(eh.getValue(entry))
else throw new IllegalArgumentException(s"Can't deserialize ${entry.getClass} because it does not have any key in the serialized message.")
}.toMap
}
def ormapFromProto(ormap: rd.ORMap): ORMap[Any, ReplicatedData] = {
val entries = mapTypeFromProto(ormap.getEntriesList, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
new ORMap( new ORMap(
keys = orsetFromProto(ormap.getKeys).asInstanceOf[ORSet[String]], keys = orsetFromProto(ormap.getKeys),
entries) entries)
} }
def lwwmapToProto(lwwmap: LWWMap[_]): rd.LWWMap = { def lwwmapToProto(lwwmap: LWWMap[_, _]): rd.LWWMap = {
val b = rd.LWWMap.newBuilder().setKeys(orsetToProto(lwwmap.underlying.keys)) val entries: jl.Iterable[rd.LWWMap.Entry] = getEntries(lwwmap.underlying.entries, rd.LWWMap.Entry.newBuilder, lwwRegisterToProto)
lwwmap.underlying.entries.toVector.sortBy { case (key, _) key }.foreach { rd.LWWMap.newBuilder().setKeys(orsetToProto(lwwmap.underlying.keys)).addAllEntries(entries).build()
case (key, value) b.addEntries(rd.LWWMap.Entry.newBuilder().
setKey(key).setValue(lwwRegisterToProto(value)))
}
b.build()
} }
def lwwmapFromBinary(bytes: Array[Byte]): LWWMap[Any] = def lwwmapFromBinary(bytes: Array[Byte]): LWWMap[Any, Any] =
lwwmapFromProto(rd.LWWMap.parseFrom(decompress(bytes))) lwwmapFromProto(rd.LWWMap.parseFrom(decompress(bytes)))
def lwwmapFromProto(lwwmap: rd.LWWMap): LWWMap[Any] = { def lwwmapFromProto(lwwmap: rd.LWWMap): LWWMap[Any, Any] = {
val entries = lwwmap.getEntriesList.asScala.map(entry val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto)
entry.getKey lwwRegisterFromProto(entry.getValue)).toMap
new LWWMap(new ORMap( new LWWMap(new ORMap(
keys = orsetFromProto(lwwmap.getKeys).asInstanceOf[ORSet[String]], keys = orsetFromProto(lwwmap.getKeys),
entries)) entries))
} }
def pncountermapToProto(pncountermap: PNCounterMap): rd.PNCounterMap = { def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = {
val b = rd.PNCounterMap.newBuilder().setKeys(orsetToProto(pncountermap.underlying.keys)) val entries: jl.Iterable[rd.PNCounterMap.Entry] = getEntries(pncountermap.underlying.entries, rd.PNCounterMap.Entry.newBuilder, pncounterToProto)
pncountermap.underlying.entries.toVector.sortBy { case (key, _) key }.foreach { rd.PNCounterMap.newBuilder().setKeys(orsetToProto(pncountermap.underlying.keys)).addAllEntries(entries).build()
case (key, value: PNCounter) b.addEntries(rd.PNCounterMap.Entry.newBuilder().
setKey(key).setValue(pncounterToProto(value)))
}
b.build()
} }
def pncountermapFromBinary(bytes: Array[Byte]): PNCounterMap = def pncountermapFromBinary(bytes: Array[Byte]): PNCounterMap[_] =
pncountermapFromProto(rd.PNCounterMap.parseFrom(decompress(bytes))) pncountermapFromProto(rd.PNCounterMap.parseFrom(decompress(bytes)))
def pncountermapFromProto(pncountermap: rd.PNCounterMap): PNCounterMap = { def pncountermapFromProto(pncountermap: rd.PNCounterMap): PNCounterMap[_] = {
val entries = pncountermap.getEntriesList.asScala.map(entry val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto)
entry.getKey pncounterFromProto(entry.getValue)).toMap
new PNCounterMap(new ORMap( new PNCounterMap(new ORMap(
keys = orsetFromProto(pncountermap.getKeys).asInstanceOf[ORSet[String]], keys = orsetFromProto(pncountermap.getKeys),
entries)) entries))
} }
def multimapToProto(multimap: ORMultiMap[_]): rd.ORMultiMap = { def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = {
val b = rd.ORMultiMap.newBuilder().setKeys(orsetToProto(multimap.underlying.keys)) val entries: jl.Iterable[rd.ORMultiMap.Entry] = getEntries(multimap.underlying.entries, rd.ORMultiMap.Entry.newBuilder, orsetToProto)
multimap.underlying.entries.toVector.sortBy { case (key, _) key }.foreach { rd.ORMultiMap.newBuilder().setKeys(orsetToProto(multimap.underlying.keys)).addAllEntries(entries).build()
case (key, value) b.addEntries(rd.ORMultiMap.Entry.newBuilder().
setKey(key).setValue(orsetToProto(value)))
}
b.build()
} }
def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any] = def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any, Any] =
multimapFromProto(rd.ORMultiMap.parseFrom(decompress(bytes))) multimapFromProto(rd.ORMultiMap.parseFrom(decompress(bytes)))
def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any] = { def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any, Any] = {
val entries = multimap.getEntriesList.asScala.map(entry val entries = mapTypeFromProto(multimap.getEntriesList, orsetFromProto)
entry.getKey orsetFromProto(entry.getValue)).toMap
new ORMultiMap(new ORMap( new ORMultiMap(new ORMap(
keys = orsetFromProto(multimap.getKeys).asInstanceOf[ORSet[String]], keys = orsetFromProto(multimap.getKeys),
entries)) entries))
} }

View file

@ -46,7 +46,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
val KeyA = GCounterKey("A") val KeyA = GCounterKey("A")
val KeyB = ORSetKey[String]("B") val KeyB = ORSetKey[String]("B")
val KeyC = PNCounterMapKey("C") val KeyC = PNCounterMapKey[String]("C")
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { runOn(from) {
@ -86,7 +86,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c") replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")
expectMsg(UpdateSuccess(KeyB, None)) expectMsg(UpdateSuccess(KeyB, None))
replicator ! Update(KeyC, PNCounterMap(), WriteAll(timeout))(_ increment "x" increment "y") replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
expectMsg(UpdateSuccess(KeyC, None)) expectMsg(UpdateSuccess(KeyC, None))
enterBarrier("updates-done") enterBarrier("updates-done")
@ -100,7 +100,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
oldSet.elements should be(Set("a", "b", "c")) oldSet.elements should be(Set("a", "b", "c"))
replicator ! Get(KeyC, ReadLocal) replicator ! Get(KeyC, ReadLocal)
val oldMap = expectMsgType[GetSuccess[PNCounterMap]].dataValue val oldMap = expectMsgType[GetSuccess[PNCounterMap[String]]].dataValue
oldMap.get("x") should be(Some(3)) oldMap.get("x") should be(Some(3))
oldMap.get("y") should be(Some(3)) oldMap.get("y") should be(Some(3))

View file

@ -57,7 +57,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
val KeyE2 = GCounterKey("E2") val KeyE2 = GCounterKey("E2")
val KeyF = GCounterKey("F") val KeyF = GCounterKey("F")
val KeyG = ORSetKey[String]("G") val KeyG = ORSetKey[String]("G")
val KeyH = ORMapKey[Flag]("H") val KeyH = ORMapKey[String, Flag]("H")
val KeyI = GSetKey[String]("I") val KeyI = GSetKey[String]("I")
val KeyJ = GSetKey[String]("J") val KeyJ = GSetKey[String]("J")
val KeyX = GCounterKey("X") val KeyX = GCounterKey("X")
@ -526,20 +526,20 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
runOn(second) { runOn(second) {
replicator ! Subscribe(KeyH, changedProbe.ref) replicator ! Subscribe(KeyH, changedProbe.ref)
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("a" Flag(enabled = false))) replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" Flag(enabled = false)))
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag(enabled = false))) changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag(enabled = false)))
} }
enterBarrier("update-h1") enterBarrier("update-h1")
runOn(first) { runOn(first) {
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("a" Flag(enabled = true))) replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" Flag(enabled = true)))
} }
runOn(second) { runOn(second) {
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag(enabled = true))) changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag(enabled = true)))
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("b" Flag(enabled = true))) replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("b" Flag(enabled = true)))
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be( changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(
Map("a" Flag(enabled = true), "b" Flag(enabled = true))) Map("a" Flag(enabled = true), "b" Flag(enabled = true)))
} }

View file

@ -19,7 +19,7 @@ class LWWMapSpec extends WordSpec with Matchers {
"A LWWMap" must { "A LWWMap" must {
"be able to set entries" in { "be able to set entries" in {
val m = LWWMap.empty[Int].put(node1, "a", 1, defaultClock[Int]).put(node2, "b", 2, defaultClock[Int]) val m = LWWMap.empty[String, Int].put(node1, "a", 1, defaultClock[Int]).put(node2, "b", 2, defaultClock[Int])
m.entries should be(Map("a" 1, "b" 2)) m.entries should be(Map("a" 1, "b" 2))
} }
@ -51,7 +51,7 @@ class LWWMapSpec extends WordSpec with Matchers {
val m1 = LWWMap.empty.put(node1, "a", 1L, defaultClock[Long]) val m1 = LWWMap.empty.put(node1, "a", 1L, defaultClock[Long])
val LWWMap(entries1) = m1 val LWWMap(entries1) = m1
val entries2: Map[String, Long] = entries1 val entries2: Map[String, Long] = entries1
Changed(LWWMapKey[Long]("key"))(m1) match { Changed(LWWMapKey[String, Long]("key"))(m1) match {
case c @ Changed(LWWMapKey("key")) case c @ Changed(LWWMapKey("key"))
val LWWMap(entries3) = c.dataValue val LWWMap(entries3) = c.dataValue
val entries4: Map[String, Long] = entries3 val entries4: Map[String, Long] = entries3

View file

@ -119,11 +119,11 @@ class ORMapSpec extends WordSpec with Matchers {
} }
"be able to update entry" in { "be able to update entry" in {
val m1 = ORMap.empty[ORSet[String]].put(node1, "a", ORSet.empty.add(node1, "A")) val m1 = ORMap.empty[String, ORSet[String]].put(node1, "a", ORSet.empty.add(node1, "A"))
.put(node1, "b", ORSet.empty.add(node1, "B01").add(node1, "B02").add(node1, "B03")) .put(node1, "b", ORSet.empty.add(node1, "B01").add(node1, "B02").add(node1, "B03"))
val m2 = ORMap.empty[ORSet[String]].put(node2, "c", ORSet.empty.add(node2, "C")) val m2 = ORMap.empty[String, ORSet[String]].put(node2, "c", ORSet.empty.add(node2, "C"))
val merged1: ORMap[ORSet[String]] = m1 merge m2 val merged1: ORMap[String, ORSet[String]] = m1 merge m2
val m3 = merged1.updated(node1, "b", ORSet.empty[String])(_.clear(node1).add(node1, "B2")) val m3 = merged1.updated(node1, "b", ORSet.empty[String])(_.clear(node1).add(node1, "B2"))
@ -140,11 +140,11 @@ class ORMapSpec extends WordSpec with Matchers {
} }
"be able to update ORSet entry with remove+put" in { "be able to update ORSet entry with remove+put" in {
val m1 = ORMap.empty[ORSet[String]].put(node1, "a", ORSet.empty.add(node1, "A01")) val m1 = ORMap.empty[String, ORSet[String]].put(node1, "a", ORSet.empty.add(node1, "A01"))
.updated(node1, "a", ORSet.empty[String])(_.add(node1, "A02")) .updated(node1, "a", ORSet.empty[String])(_.add(node1, "A02"))
.updated(node1, "a", ORSet.empty[String])(_.add(node1, "A03")) .updated(node1, "a", ORSet.empty[String])(_.add(node1, "A03"))
.put(node1, "b", ORSet.empty.add(node1, "B01").add(node1, "B02").add(node1, "B03")) .put(node1, "b", ORSet.empty.add(node1, "B01").add(node1, "B02").add(node1, "B03"))
val m2 = ORMap.empty[ORSet[String]].put(node2, "c", ORSet.empty.add(node2, "C")) val m2 = ORMap.empty[String, ORSet[String]].put(node2, "c", ORSet.empty.add(node2, "C"))
val merged1 = m1 merge m2 val merged1 = m1 merge m2
@ -190,10 +190,10 @@ class ORMapSpec extends WordSpec with Matchers {
"have unapply extractor" in { "have unapply extractor" in {
val m1 = ORMap.empty.put(node1, "a", Flag(true)).put(node2, "b", Flag(false)) val m1 = ORMap.empty.put(node1, "a", Flag(true)).put(node2, "b", Flag(false))
val m2: ORMap[Flag] = m1 val m2: ORMap[String, Flag] = m1
val ORMap(entries1) = m1 val ORMap(entries1) = m1
val entries2: Map[String, Flag] = entries1 val entries2: Map[String, Flag] = entries1
Changed(ORMapKey[Flag]("key"))(m1) match { Changed(ORMapKey[String, Flag]("key"))(m1) match {
case c @ Changed(ORMapKey("key")) case c @ Changed(ORMapKey("key"))
val ORMap(entries3) = c.dataValue val ORMap(entries3) = c.dataValue
val entries4: Map[String, ReplicatedData] = entries3 val entries4: Map[String, ReplicatedData] = entries3

View file

@ -109,10 +109,10 @@ class ORMultiMapSpec extends WordSpec with Matchers {
"have unapply extractor" in { "have unapply extractor" in {
val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L)) val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L))
val m2: ORMultiMap[Long] = m1 val m2: ORMultiMap[String, Long] = m1
val ORMultiMap(entries1) = m1 val ORMultiMap(entries1) = m1
val entries2: Map[String, Set[Long]] = entries1 val entries2: Map[String, Set[Long]] = entries1
Changed(ORMultiMapKey[Long]("key"))(m1) match { Changed(ORMultiMapKey[String, Long]("key"))(m1) match {
case c @ Changed(ORMultiMapKey("key")) case c @ Changed(ORMultiMapKey("key"))
val ORMultiMap(entries3) = c.dataValue val ORMultiMap(entries3) = c.dataValue
val entries4: Map[String, Set[Long]] = entries3 val entries4: Map[String, Set[Long]] = entries3

View file

@ -50,7 +50,7 @@ class PNCounterMapSpec extends WordSpec with Matchers {
val m1 = PNCounterMap.empty.increment(node1, "a", 1).increment(node2, "b", 2) val m1 = PNCounterMap.empty.increment(node1, "a", 1).increment(node2, "b", 2)
val PNCounterMap(entries1) = m1 val PNCounterMap(entries1) = m1
val entries2: Map[String, BigInt] = entries1 val entries2: Map[String, BigInt] = entries1
Changed(PNCounterMapKey("key"))(m1) match { Changed(PNCounterMapKey[String]("key"))(m1) match {
case c @ Changed(PNCounterMapKey("key")) case c @ Changed(PNCounterMapKey("key"))
val PNCounterMap(entries3) = c.dataValue val PNCounterMap(entries3) = c.dataValue
val entries4: Map[String, BigInt] = entries3 val entries4: Map[String, BigInt] = entries3

View file

@ -3,24 +3,16 @@
*/ */
package akka.cluster.ddata.protobuf package akka.cluster.ddata.protobuf
import java.util.Base64
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers import org.scalatest.Matchers
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Address import akka.actor.Address
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.cluster.ddata.Flag import akka.cluster.ddata._
import akka.cluster.ddata.GCounter
import akka.cluster.ddata.GSet
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.LWWRegister
import akka.cluster.ddata.ORMap
import akka.cluster.ddata.ORMultiMap
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.PNCounter
import akka.cluster.ddata.PNCounterMap
import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.Replicator.Internal._
import akka.cluster.ddata.VersionVector
import akka.testkit.TestKit import akka.testkit.TestKit
import akka.cluster.UniqueAddress import akka.cluster.UniqueAddress
import akka.remote.RARP import akka.remote.RARP
@ -46,6 +38,17 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
shutdown() shutdown()
} }
/**
* Given a blob created with the previous serializer (with only string keys for maps). If we deserialize it and then
* serialize it again and arive at the same BLOB we can assume that we are compatible in both directions.
*/
def checkCompatibility(oldBlobAsBase64: String, obj: AnyRef): Unit = {
val oldBlob = Base64.getDecoder.decode(oldBlobAsBase64)
val deserialized = serializer.fromBinary(oldBlob, serializer.manifest(obj))
val newBlob = serializer.toBinary(deserialized)
newBlob should equal(oldBlob)
}
def checkSerialization(obj: AnyRef): Unit = { def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj) val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj)) val ref = serializer.fromBinary(blob, serializer.manifest(obj))
@ -140,38 +143,73 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
} }
"serialize ORMap" in { "serialize ORMap" in {
checkSerialization(ORMap())
checkSerialization(ORMap().put(address1, "a", GSet() + "A"))
checkSerialization(ORMap().put(address1, "a", GSet() + "A").put(address2, "b", GSet() + "B")) checkSerialization(ORMap().put(address1, "a", GSet() + "A").put(address2, "b", GSet() + "B"))
checkSerialization(ORMap().put(address1, 1, GSet() + "A"))
checkSerialization(ORMap().put(address1, 1L, GSet() + "A"))
// use Flag for this test as object key because it is serializable
checkSerialization(ORMap().put(address1, Flag(), GSet() + "A"))
}
"be compatible with old ORMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOOax8jlyaXMJc8lzMWXX5KRWqSXkV9copdflC7wXEWUiYGBQRaIGQQkuJS45LiEuHiL83NTUdQwwtWIC6kQpUqVKAulGBOlGJOE+LkYE4W4uJi5GB0FuJUYnUACSRABJ7AAAOLO3C3DAAAA"
checkCompatibility(oldBlobAsBase64, ORMap())
} }
"serialize LWWMap" in { "serialize LWWMap" in {
checkSerialization(LWWMap()) checkSerialization(LWWMap())
checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any])) checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, 1, "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, 1L, "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, Flag(), "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any]) checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any])
.put(address2, "b", 17, LWWRegister.defaultClock[Any])) .put(address2, "b", 17, LWWRegister.defaultClock[Any]))
} }
"be compatible with old LWWMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC0kJEqZJiTBSy5wISVhwzrl2fuyRMiIAWKUEu3jVvGVhLGNjKEnNKUw0FGAG1K/3VkgAAAA=="
checkCompatibility(oldBlobAsBase64, LWWMap())
}
"serialize PNCounterMap" in { "serialize PNCounterMap" in {
checkSerialization(PNCounterMap()) checkSerialization(PNCounterMap())
checkSerialization(PNCounterMap().increment(address1, "a", 3)) checkSerialization(PNCounterMap().increment(address1, "a", 3))
checkSerialization(PNCounterMap().increment(address1, 1, 3))
checkSerialization(PNCounterMap().increment(address1, 1L, 3))
checkSerialization(PNCounterMap().increment(address1, Flag(), 3))
checkSerialization(PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 2). checkSerialization(PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 2).
increment(address2, "b", 5)) increment(address2, "b", 5))
} }
"be compatible with old PNCounterMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC8kJEqZJiTBTS4wISmlyqXMqE1AsxMgsxAADYQs/9gQAAAA=="
checkCompatibility(oldBlobAsBase64, PNCounterMap())
}
"serialize ORMultiMap" in { "serialize ORMultiMap" in {
checkSerialization(ORMultiMap()) checkSerialization(ORMultiMap())
checkSerialization(ORMultiMap().addBinding(address1, "a", "A")) checkSerialization(ORMultiMap().addBinding(address1, "a", "A"))
checkSerialization(ORMultiMap.empty[String] checkSerialization(ORMultiMap().addBinding(address1, 1, "A"))
checkSerialization(ORMultiMap().addBinding(address1, 1L, "A"))
checkSerialization(ORMultiMap().addBinding(address1, Flag(), "A"))
checkSerialization(ORMultiMap.empty[String, String]
.addBinding(address1, "a", "A1") .addBinding(address1, "a", "A1")
.put(address2, "b", Set("B1", "B2", "B3")) .put(address2, "b", Set("B1", "B2", "B3"))
.addBinding(address2, "a", "A2")) .addBinding(address2, "a", "A2"))
val m1 = ORMultiMap.empty[String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2") val m1 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2")
val m2 = ORMultiMap.empty[String].put(address2, "b", Set("B1", "B2", "B3")) val m2 = ORMultiMap.empty[String, String].put(address2, "b", Set("B1", "B2", "B3"))
checkSameContent(m1.merge(m2), m2.merge(m1)) checkSameContent(m1.merge(m2), m2.merge(m1))
} }
"be compatible with old ORMultiMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwCakJEqZJiTBQK4QISxJmqSpSpqlKMjgDlsHjDpwAAAA=="
checkCompatibility(oldBlobAsBase64, ORMultiMap())
}
"serialize DeletedData" in { "serialize DeletedData" in {
checkSerialization(DeletedData) checkSerialization(DeletedData)
} }

View file

@ -301,10 +301,10 @@ public class DistributedDataDocTest extends AbstractJavaTest {
public void demonstratePNCounterMap() { public void demonstratePNCounterMap() {
//#pncountermap //#pncountermap
final Cluster node = Cluster.get(system); final Cluster node = Cluster.get(system);
final PNCounterMap m0 = PNCounterMap.create(); final PNCounterMap<String> m0 = PNCounterMap.create();
final PNCounterMap m1 = m0.increment(node, "a", 7); final PNCounterMap<String> m1 = m0.increment(node, "a", 7);
final PNCounterMap m2 = m1.decrement(node, "a", 2); final PNCounterMap<String> m2 = m1.decrement(node, "a", 2);
final PNCounterMap m3 = m2.increment(node, "b", 1); final PNCounterMap<String> m3 = m2.increment(node, "b", 1);
System.out.println(m3.get("a")); // 5 System.out.println(m3.get("a")); // 5
System.out.println(m3.getEntries()); System.out.println(m3.getEntries());
//#pncountermap //#pncountermap
@ -334,12 +334,12 @@ public class DistributedDataDocTest extends AbstractJavaTest {
public void demonstrateORMultiMap() { public void demonstrateORMultiMap() {
//#ormultimap //#ormultimap
final Cluster node = Cluster.get(system); final Cluster node = Cluster.get(system);
final ORMultiMap<Integer> m0 = ORMultiMap.create(); final ORMultiMap<String, Integer> m0 = ORMultiMap.create();
final ORMultiMap<Integer> m1 = m0.put(node, "a", final ORMultiMap<String, Integer> m1 = m0.put(node, "a",
new HashSet<Integer>(Arrays.asList(1, 2, 3))); new HashSet<>(Arrays.asList(1, 2, 3)));
final ORMultiMap<Integer> m2 = m1.addBinding(node, "a", 4); final ORMultiMap<String, Integer> m2 = m1.addBinding(node, "a", 4);
final ORMultiMap<Integer> m3 = m2.removeBinding(node, "a", 2); final ORMultiMap<String, Integer> m3 = m2.removeBinding(node, "a", 2);
final ORMultiMap<Integer> m4 = m3.addBinding(node, "b", 1); final ORMultiMap<String, Integer> m4 = m3.addBinding(node, "b", 1);
System.out.println(m4.getEntries()); System.out.println(m4.getEntries());
//#ormultimap //#ormultimap
} }

View file

@ -317,7 +317,7 @@ track causality of the operations and resolve concurrent updates.
Maps Maps
---- ----
``ORMap`` (observed-remove map) is a map with ``String`` keys and the values are ``ReplicatedData`` ``ORMap`` (observed-remove map) is a map with keys of ``Any`` type and the values are ``ReplicatedData``
types themselves. It supports add, remove and delete any number of times for a map entry. types themselves. It supports add, remove and delete any number of times for a map entry.
If an entry is concurrently added and removed, the add will win. You cannot remove an entry that If an entry is concurrently added and removed, the add will win. You cannot remove an entry that

View file

@ -169,3 +169,18 @@ the Agents, as they rarely are really enough and do not fit the Akka spirit of t
We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so in preparation thereof the Agents have been deprecated in 2.5. We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so in preparation thereof the Agents have been deprecated in 2.5.
If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github. If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github.
Distributed Data
================
Map allow generic type for the keys
-----------------------------------
In 2.4 the key of any Distributed Data map always needed to be of type String. In 2.5 you can use any type for the key. This means that
every map (ORMap, LWWMap, PNCounterMap, ORMultiMap) now takes an extra type parameter to specify the key type. To migrate
existing code from 2.4 to 2.5 you simple add String as key type, for example: `ORMultiMap[Foo]` becomes `ORMultiMap[String, Foo]`.
`PNCounterMap` didn't take a type parameter in version 2.4, so `PNCounterMap` in 2.4 becomes `PNCounterMap[String]` in 2.5.
Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap<String>`.
**NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long
as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.**

View file

@ -301,7 +301,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
def println(o: Any): Unit = () def println(o: Any): Unit = ()
//#pncountermap //#pncountermap
implicit val node = Cluster(system) implicit val node = Cluster(system)
val m0 = PNCounterMap.empty val m0 = PNCounterMap.empty[String]
val m1 = m0.increment("a", 7) val m1 = m0.increment("a", 7)
val m2 = m1.decrement("a", 2) val m2 = m1.decrement("a", 2)
val m3 = m2.increment("b", 1) val m3 = m2.increment("b", 1)
@ -337,7 +337,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
def println(o: Any): Unit = () def println(o: Any): Unit = ()
//#ormultimap //#ormultimap
implicit val node = Cluster(system) implicit val node = Cluster(system)
val m0 = ORMultiMap.empty[Int] val m0 = ORMultiMap.empty[String, Int]
val m1 = m0 + ("a" -> Set(1, 2, 3)) val m1 = m0 + ("a" -> Set(1, 2, 3))
val m2 = m1.addBinding("a", 4) val m2 = m1.addBinding("a", 4)
val m3 = m2.removeBinding("a", 2) val m3 = m2.removeBinding("a", 2)

View file

@ -317,7 +317,7 @@ track causality of the operations and resolve concurrent updates.
Maps Maps
---- ----
``ORMap`` (observed-remove map) is a map with ``String`` keys and the values are ``ReplicatedData`` ``ORMap`` (observed-remove map) is a map with keys of ``Any`` type and the values are ``ReplicatedData``
types themselves. It supports add, remove and delete any number of times for a map entry. types themselves. It supports add, remove and delete any number of times for a map entry.
If an entry is concurrently added and removed, the add will win. You cannot remove an entry that If an entry is concurrently added and removed, the add will win. You cannot remove an entry that
@ -333,8 +333,8 @@ such as the following specialized maps.
``ORMultiMap`` (observed-remove multi-map) is a multi-map implementation that wraps an ``ORMultiMap`` (observed-remove multi-map) is a multi-map implementation that wraps an
``ORMap`` with an ``ORSet`` for the map's value. ``ORMap`` with an ``ORSet`` for the map's value.
``PNCounterMap`` (positive negative counter map) is a map of named counters. It is a specialized ``PNCounterMap`` (positive negative counter map) is a map of named counters (where the name can be of any type).
``ORMap`` with ``PNCounter`` values. It is a specialized ``ORMap`` with ``PNCounter`` values.
``LWWMap`` (last writer wins map) is a specialized ``ORMap`` with ``LWWRegister`` (last writer wins register) ``LWWMap`` (last writer wins map) is a specialized ``ORMap`` with ``LWWRegister`` (last writer wins register)
values. values.

View file

@ -3,26 +3,26 @@
*/ */
package akka.remote.security.provider package akka.remote.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } import java.security.SecureRandom
import java.util.concurrent.Executors
import SeedSize.Seed128 import SeedSize.Seed128
import scala.concurrent.ExecutionContext
/** /**
* INTERNAL API * This class is a wrapper around the 128-bit AESCounterBuiltinRNG AES/CTR PRNG algorithm
* This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* The only method used by netty ssl is engineNextBytes(bytes) * The only method used by netty ssl is engineNextBytes(bytes)
* This RNG is good to use to prevent startup delay when you don't have Internet access to random.org
*/ */
class AES128CounterSecureRNG extends java.security.SecureRandomSpi { class AES128CounterSecureRNG extends java.security.SecureRandomSpi {
/**Singleton instance. */ private val singleThreadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(new AESCounterBuiltinRNGReSeeder))
private final val Instance: SecureRandomSeedGenerator = new SecureRandomSeedGenerator private val entropySource = new SecureRandom
private val seed = entropySource.generateSeed(Seed128)
private val rng = new AESCounterBuiltinRNG(seed, singleThreadPool)
/** /**
* Make sure the seed generator is provided by a SecureRandom singleton and not default 'Random' * This is managed internally by AESCounterBuiltinRNG
*/
private val rng = new AESCounterRNG(engineGenerateSeed(Seed128))
/**
* This is managed internally by AESCounterRNG
*/ */
override protected def engineSetSeed(seed: Array[Byte]): Unit = () override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
@ -34,13 +34,11 @@ class AES128CounterSecureRNG extends java.security.SecureRandomSpi {
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
/** /**
* Unused method * For completeness of SecureRandomSpi API implementation
* Returns the given number of seed bytes. This call may be used to * Returns the given number of seed bytes.
* seed other random number generators.
* *
* @param numBytes the number of seed bytes to generate. * @param numBytes the number of seed bytes to generate.
* @return the seed bytes. * @return the seed bytes.
*/ */
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = Instance.generateSeed(numBytes) override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = entropySource.generateSeed(numBytes)
} }

View file

@ -3,23 +3,26 @@
*/ */
package akka.remote.security.provider package akka.remote.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } import java.security.SecureRandom
import java.util.concurrent.Executors
import SeedSize.Seed256 import SeedSize.Seed256
import scala.concurrent.ExecutionContext
/** /**
* INTERNAL API * This class is a wrapper around the 256-bit AESCounterBuiltinRNG AES/CTR PRNG algorithm
* This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* The only method used by netty ssl is engineNextBytes(bytes) * The only method used by netty ssl is engineNextBytes(bytes)
* This RNG is good to use to prevent startup delay when you don't have Internet access to random.org
*/ */
class AES256CounterSecureRNG extends java.security.SecureRandomSpi { class AES256CounterSecureRNG extends java.security.SecureRandomSpi {
/**Singleton instance. */ private val singleThreadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(new AESCounterBuiltinRNGReSeeder))
private final val Instance: SecureRandomSeedGenerator = new SecureRandomSeedGenerator private val entropySource = new SecureRandom
private val seed = entropySource.generateSeed(Seed256)
private val rng = new AESCounterRNG(engineGenerateSeed(Seed256)) private val rng = new AESCounterBuiltinRNG(seed, singleThreadPool)
/** /**
* This is managed internally by AESCounterRNG * This is managed internally by AESCounterBuiltinRNG
*/ */
override protected def engineSetSeed(seed: Array[Byte]): Unit = () override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
@ -31,13 +34,11 @@ class AES256CounterSecureRNG extends java.security.SecureRandomSpi {
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
/** /**
* Unused method * For completeness of SecureRandomSpi API implementation
* Returns the given number of seed bytes. This call may be used to * Returns the given number of seed bytes.
* seed other random number generators.
* *
* @param numBytes the number of seed bytes to generate. * @param numBytes the number of seed bytes to generate.
* @return the seed bytes. * @return the seed bytes.
*/ */
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = Instance.generateSeed(numBytes) override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = entropySource.generateSeed(numBytes)
} }

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.security.provider
import java.security.{ Key, SecureRandom }
import java.util.Random
import java.util.concurrent.ThreadFactory
import javax.crypto.Cipher
import javax.crypto.spec.IvParameterSpec
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Await, ExecutionContext, Future, duration }
/**
* INTERNAL API
* This class is a Scala implementation of AESCounterRNG algorithm
* patterned after org.uncommons.maths.random by Daniel Dyer (Apache License 2.0)
*
* Non-linear random number generator based on the AES block cipher in counter mode.
* Uses the seed as a key to encrypt a 128-bit counter using AES(Rijndael).
*
* Keys larger than 128-bit for the AES cipher require
* the inconvenience of installing the unlimited strength cryptography policy
* files for the Java platform. Larger keys may be used (192 or 256 bits) but if the
* cryptography policy files are not installed, a
* java.security.GeneralSecurityException will be thrown.
*
* NOTE: this class is not serializable
*/
// FIMXE add @InternalApi
private[akka] class AESCounterBuiltinRNG(val seed: Array[Byte], implicit val executionContext: ExecutionContext,
val reseedingThreshold: Long = CounterRNGConstants.ReseedingThreshold,
val reseedingDeadline: Long = CounterRNGConstants.ReseedingDeadline,
val reseedingTimeout: Duration = CounterRNGConstants.ReseedingTimeout) extends Random {
import CounterRNGConstants._
private val entropySource = new SecureRandom
// mutable state below, concurrent accesses need synchronized or lock
private val counter: Array[Byte] = Array.fill[Byte](CounterSizeBytes)(0)
private var index: Int = 0
private var currentBlock: Array[Byte] = null
private var reseedFuture: Future[Array[Byte]] = null
private var bitsSinceSeeding: Long = 0
private val cipher = Cipher.getInstance("AES/CTR/NoPadding")
private val ivArr = Array.fill[Byte](CounterSizeBytes)(0)
ivArr(0) = (ivArr(0) + 1.toByte).toByte
private val ivSpec = new IvParameterSpec(ivArr)
cipher.init(Cipher.ENCRYPT_MODE, new this.AESKey(seed), ivSpec)
@Override
override protected def next(bits: Int): Int = synchronized {
// random result generation phase - if there is not enough bits in counter variable
// we generate some more with AES/CTR
bitsSinceSeeding += bits
if (currentBlock == null || currentBlock.length - index < 4) {
try {
currentBlock = cipher.doFinal(counter)
index = 0
} catch {
case ex: Exception
// Generally Cipher.doFinal() from nextBlock may throw various exceptions.
// However this should never happen. If initialisation succeeds without exceptions
// we should be able to proceed indefinitely without exceptions.
throw new IllegalStateException("Failed creating next random block.", ex)
}
}
// now, enough bits in counter, generate pseudo-random result
val result = (BitwiseByteToInt & currentBlock(index + 3)) |
((BitwiseByteToInt & currentBlock(index + 2)) << 8) |
((BitwiseByteToInt & currentBlock(index + 1)) << 16) |
((BitwiseByteToInt & currentBlock(index)) << 24)
// re-seeding phase
// first, we check if reseedingThreshold is exceeded to see if new entropy is required
// we can still proceed without it, but we should ask for it
if (bitsSinceSeeding > reseedingThreshold) {
if (reseedFuture == null) {
// ask for a seed and process async on a separate thread using AESCounterBuiltinRNGReSeeder threadpool
reseedFuture = Future { entropySource.generateSeed(32) }
}
// check if reseedingDeadline is exceeded - in that case we cannot proceed, as that would be insecure
// we need to block on the future to wait for entropy
if (bitsSinceSeeding > reseedingDeadline) {
try {
Await.ready(reseedFuture, reseedingTimeout)
} catch {
case ex: Exception
Console.err.println(s"[ERROR] AESCounterBuiltinRNG re-seeding failed or timed out after ${reseedingTimeout.toSeconds.toString}s !")
}
}
// check if future has completed and retrieve additional entropy if that is the case
if (reseedFuture != null && reseedFuture.isCompleted) {
if (reseedFuture.value.get.isSuccess) { // we have re-seeded with success
val newSeed = reseedFuture.value.get.get // this is safe
cipher.init(Cipher.ENCRYPT_MODE, new this.AESKey(newSeed), ivSpec)
currentBlock = null
bitsSinceSeeding = 0 // reset re-seeding counter
}
reseedFuture = null // request creation of new seed when needed
}
}
index += 4
result >>> (32 - bits)
}
/**
* Trivial key implementation for use with AES cipher.
*/
final private class AESKey(val keyData: Array[Byte]) extends Key {
def getAlgorithm: String = "AES"
def getFormat: String = "RAW"
def getEncoded: Array[Byte] = keyData
}
}
private object CounterRNGConstants {
final val CounterSizeBytes = 16
final val BitwiseByteToInt = 0x000000FF
final val ReseedingThreshold = 1000000000L // threshold for requesting new entropy (should give us ample time to re-seed)
final val ReseedingDeadline = 140737488355328L // deadline for obtaining new entropy 2^47 safe as per SP800-90
final val ReseedingTimeout: FiniteDuration = Duration.apply(5, duration.MINUTES) // timeout for re-seeding (on Linux read from /dev/random)
}
private class AESCounterBuiltinRNGReSeeder extends ThreadFactory {
override def newThread(r: Runnable): Thread = new Thread(r, "AESCounterBuiltinRNGReSeeder")
}

View file

@ -119,43 +119,43 @@ public class ReplicatedCache extends AbstractActor {
.match(PutInCache.class, cmd -> receivePutInCache(cmd.key, cmd.value)) .match(PutInCache.class, cmd -> receivePutInCache(cmd.key, cmd.value))
.match(Evict.class, cmd -> receiveEvict(cmd.key)) .match(Evict.class, cmd -> receiveEvict(cmd.key))
.match(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key)) .match(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key))
.match(GetSuccess.class, g -> receiveGetSuccess((GetSuccess<LWWMap<Object>>) g)) .match(GetSuccess.class, g -> receiveGetSuccess((GetSuccess<LWWMap<String, Object>>) g))
.match(NotFound.class, n -> receiveNotFound((NotFound<LWWMap<Object>>) n)) .match(NotFound.class, n -> receiveNotFound((NotFound<LWWMap<String, Object>>) n))
.match(UpdateResponse.class, u -> {}) .match(UpdateResponse.class, u -> {})
.build()); .build());
} }
private void receivePutInCache(String key, Object value) { private void receivePutInCache(String key, Object value) {
Update<LWWMap<Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(), Update<LWWMap<String, Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(),
curr -> curr.put(node, key, value)); curr -> curr.put(node, key, value));
replicator.tell(update, self()); replicator.tell(update, self());
} }
private void receiveEvict(String key) { private void receiveEvict(String key) {
Update<LWWMap<Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(), Update<LWWMap<String, Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(),
curr -> curr.remove(node, key)); curr -> curr.remove(node, key));
replicator.tell(update, self()); replicator.tell(update, self());
} }
private void receiveGetFromCache(String key) { private void receiveGetFromCache(String key) {
Optional<Object> ctx = Optional.of(new Request(key, sender())); Optional<Object> ctx = Optional.of(new Request(key, sender()));
Get<LWWMap<Object>> get = new Get<>(dataKey(key), readLocal(), ctx); Get<LWWMap<String, Object>> get = new Get<>(dataKey(key), readLocal(), ctx);
replicator.tell(get, self()); replicator.tell(get, self());
} }
private void receiveGetSuccess(GetSuccess<LWWMap<Object>> g) { private void receiveGetSuccess(GetSuccess<LWWMap<String, Object>> g) {
Request req = (Request) g.getRequest().get(); Request req = (Request) g.getRequest().get();
Option<Object> valueOption = g.dataValue().get(req.key); Option<Object> valueOption = g.dataValue().get(req.key);
Optional<Object> valueOptional = Optional.ofNullable(valueOption.isDefined() ? valueOption.get() : null); Optional<Object> valueOptional = Optional.ofNullable(valueOption.isDefined() ? valueOption.get() : null);
req.replyTo.tell(new Cached(req.key, valueOptional), self()); req.replyTo.tell(new Cached(req.key, valueOptional), self());
} }
private void receiveNotFound(NotFound<LWWMap<Object>> n) { private void receiveNotFound(NotFound<LWWMap<String, Object>> n) {
Request req = (Request) n.getRequest().get(); Request req = (Request) n.getRequest().get();
req.replyTo.tell(new Cached(req.key, Optional.empty()), self()); req.replyTo.tell(new Cached(req.key, Optional.empty()), self());
} }
private Key<LWWMap<Object>> dataKey(String entryKey) { private Key<LWWMap<String, Object>> dataKey(String entryKey) {
return LWWMapKey.create("cache-" + Math.abs(entryKey.hashCode()) % 100); return LWWMapKey.create("cache-" + Math.abs(entryKey.hashCode()) % 100);
} }

View file

@ -59,8 +59,8 @@ public class ReplicatedMetrics extends AbstractActor {
private final MemoryMXBean memoryMBean = ManagementFactory.getMemoryMXBean(); private final MemoryMXBean memoryMBean = ManagementFactory.getMemoryMXBean();
private final LoggingAdapter log = Logging.getLogger(context().system(), this); private final LoggingAdapter log = Logging.getLogger(context().system(), this);
private final Key<LWWMap<Long>> usedHeapKey = LWWMapKey.create("usedHeap"); private final Key<LWWMap<String, Long>> usedHeapKey = LWWMapKey.create("usedHeap");
private final Key<LWWMap<Long>> maxHeapKey = LWWMapKey.create("maxHeap"); private final Key<LWWMap<String, Long>> maxHeapKey = LWWMapKey.create("maxHeap");
private final Cancellable tickTask; private final Cancellable tickTask;
private final Cancellable cleanupTask; private final Cancellable cleanupTask;
@ -92,8 +92,8 @@ public class ReplicatedMetrics extends AbstractActor {
receive(ReceiveBuilder receive(ReceiveBuilder
.matchEquals(TICK, t -> receiveTick()) .matchEquals(TICK, t -> receiveTick())
.match(Changed.class, c -> c.key().equals(maxHeapKey), c -> receiveMaxHeapChanged((Changed<LWWMap<Long>>) c)) .match(Changed.class, c -> c.key().equals(maxHeapKey), c -> receiveMaxHeapChanged((Changed<LWWMap<String, Long>>) c))
.match(Changed.class, c -> c.key().equals(usedHeapKey), c -> receiveUsedHeapChanged((Changed<LWWMap<Long>>) c)) .match(Changed.class, c -> c.key().equals(usedHeapKey), c -> receiveUsedHeapChanged((Changed<LWWMap<String, Long>>) c))
.match(UpdateResponse.class, u -> {}) .match(UpdateResponse.class, u -> {})
.match(MemberUp.class, m -> receiveMemberUp(m.member().address())) .match(MemberUp.class, m -> receiveMemberUp(m.member().address()))
.match(MemberRemoved.class, m -> receiveMemberRemoved(m.member().address())) .match(MemberRemoved.class, m -> receiveMemberRemoved(m.member().address()))
@ -106,11 +106,11 @@ public class ReplicatedMetrics extends AbstractActor {
long used = heap.getUsed(); long used = heap.getUsed();
long max = heap.getMax(); long max = heap.getMax();
Update<LWWMap<Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), Update<LWWMap<String, Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(),
curr -> curr.put(node, selfNodeKey, used)); curr -> curr.put(node, selfNodeKey, used));
replicator.tell(update1, self()); replicator.tell(update1, self());
Update<LWWMap<Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> { Update<LWWMap<String, Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> {
if (curr.contains(selfNodeKey) && curr.get(selfNodeKey).get().longValue() == max) if (curr.contains(selfNodeKey) && curr.get(selfNodeKey).get().longValue() == max)
return curr; // unchanged return curr; // unchanged
else else
@ -119,11 +119,11 @@ public class ReplicatedMetrics extends AbstractActor {
replicator.tell(update2, self()); replicator.tell(update2, self());
} }
private void receiveMaxHeapChanged(Changed<LWWMap<Long>> c) { private void receiveMaxHeapChanged(Changed<LWWMap<String, Long>> c) {
maxHeap = c.dataValue().getEntries(); maxHeap = c.dataValue().getEntries();
} }
private void receiveUsedHeapChanged(Changed<LWWMap<Long>> c) { private void receiveUsedHeapChanged(Changed<LWWMap<String, Long>> c) {
Map<String, Double> percentPerNode = new HashMap<>(); Map<String, Double> percentPerNode = new HashMap<>();
for (Map.Entry<String, Long> entry : c.dataValue().getEntries().entrySet()) { for (Map.Entry<String, Long> entry : c.dataValue().getEntries().entrySet()) {
if (maxHeap.containsKey(entry.getKey())) { if (maxHeap.containsKey(entry.getKey())) {
@ -147,14 +147,14 @@ public class ReplicatedMetrics extends AbstractActor {
} }
private void receiveCleanup() { private void receiveCleanup() {
Update<LWWMap<Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr)); Update<LWWMap<String, Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr));
replicator.tell(update1, self()); replicator.tell(update1, self());
Update<LWWMap<Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr)); Update<LWWMap<String, Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr));
replicator.tell(update2, self()); replicator.tell(update2, self());
} }
private LWWMap<Long> cleanup(LWWMap<Long> data) { private LWWMap<String, Long> cleanup(LWWMap<String, Long> data) {
LWWMap<Long> result = data; LWWMap<String, Long> result = data;
log.info("Cleanup " + nodesInCluster + " -- " + data.getEntries().keySet()); log.info("Cleanup " + nodesInCluster + " -- " + data.getEntries().keySet());
for (String k : data.getEntries().keySet()) { for (String k : data.getEntries().keySet()) {
if (!nodesInCluster.contains(k)) { if (!nodesInCluster.contains(k)) {

View file

@ -130,7 +130,7 @@ public class ShoppingCart extends AbstractActor {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private final String userId; private final String userId;
private final Key<LWWMap<LineItem>> dataKey; private final Key<LWWMap<String, LineItem>> dataKey;
public ShoppingCart(String userId) { public ShoppingCart(String userId) {
this.userId = userId; this.userId = userId;
@ -149,18 +149,18 @@ public class ShoppingCart extends AbstractActor {
.matchEquals((GET_CART), .matchEquals((GET_CART),
s -> receiveGetCart()) s -> receiveGetCart())
.match(GetSuccess.class, g -> isResponseToGetCart(g), .match(GetSuccess.class, g -> isResponseToGetCart(g),
g -> receiveGetSuccess((GetSuccess<LWWMap<LineItem>>) g)) g -> receiveGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
.match(NotFound.class, n -> isResponseToGetCart(n), .match(NotFound.class, n -> isResponseToGetCart(n),
n -> receiveNotFound((NotFound<LWWMap<LineItem>>) n)) n -> receiveNotFound((NotFound<LWWMap<String, LineItem>>) n))
.match(GetFailure.class, f -> isResponseToGetCart(f), .match(GetFailure.class, f -> isResponseToGetCart(f),
f -> receiveGetFailure((GetFailure<LWWMap<LineItem>>) f)) f -> receiveGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
.build(); .build();
} }
private void receiveGetCart() { private void receiveGetCart() {
Optional<Object> ctx = Optional.of(sender()); Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, readMajority, ctx), replicator.tell(new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx),
self()); self());
} }
@ -169,21 +169,21 @@ public class ShoppingCart extends AbstractActor {
(response.getRequest().orElse(null) instanceof ActorRef); (response.getRequest().orElse(null) instanceof ActorRef);
} }
private void receiveGetSuccess(GetSuccess<LWWMap<LineItem>> g) { private void receiveGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values()); Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values());
ActorRef replyTo = (ActorRef) g.getRequest().get(); ActorRef replyTo = (ActorRef) g.getRequest().get();
replyTo.tell(new Cart(items), self()); replyTo.tell(new Cart(items), self());
} }
private void receiveNotFound(NotFound<LWWMap<LineItem>> n) { private void receiveNotFound(NotFound<LWWMap<String, LineItem>> n) {
ActorRef replyTo = (ActorRef) n.getRequest().get(); ActorRef replyTo = (ActorRef) n.getRequest().get();
replyTo.tell(new Cart(new HashSet<>()), self()); replyTo.tell(new Cart(new HashSet<>()), self());
} }
private void receiveGetFailure(GetFailure<LWWMap<LineItem>> f) { private void receiveGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
// ReadMajority failure, try again with local read // ReadMajority failure, try again with local read
Optional<Object> ctx = Optional.of(sender()); Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, Replicator.readLocal(), replicator.tell(new Replicator.Get<LWWMap<String, LineItem>>(dataKey, Replicator.readLocal(),
ctx), self()); ctx), self());
} }
//#get-cart //#get-cart
@ -196,14 +196,14 @@ public class ShoppingCart extends AbstractActor {
} }
private void receiveAddItem(AddItem add) { private void receiveAddItem(AddItem add) {
Update<LWWMap<LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority, Update<LWWMap<String, LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority,
cart -> updateCart(cart, add.item)); cart -> updateCart(cart, add.item));
replicator.tell(update, self()); replicator.tell(update, self());
} }
//#add-item //#add-item
private LWWMap<LineItem> updateCart(LWWMap<LineItem> data, LineItem item) { private LWWMap<String, LineItem> updateCart(LWWMap<String, LineItem> data, LineItem item) {
if (data.contains(item.productId)) { if (data.contains(item.productId)) {
LineItem existingItem = data.get(item.productId).get(); LineItem existingItem = data.get(item.productId).get();
int newQuantity = existingItem.quantity + item.quantity; int newQuantity = existingItem.quantity + item.quantity;
@ -218,9 +218,9 @@ public class ShoppingCart extends AbstractActor {
return ReceiveBuilder return ReceiveBuilder
.match(RemoveItem.class, r -> receiveRemoveItem(r)) .match(RemoveItem.class, r -> receiveRemoveItem(r))
.match(GetSuccess.class, g -> isResponseToRemoveItem(g), .match(GetSuccess.class, g -> isResponseToRemoveItem(g),
g -> receiveRemoveItemGetSuccess((GetSuccess<LWWMap<LineItem>>) g)) g -> receiveRemoveItemGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
.match(GetFailure.class, f -> isResponseToRemoveItem(f), .match(GetFailure.class, f -> isResponseToRemoveItem(f),
f -> receiveRemoveItemGetFailure((GetFailure<LWWMap<LineItem>>) f)) f -> receiveRemoveItemGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
.match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */}) .match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */})
.build(); .build();
} }
@ -230,24 +230,24 @@ public class ShoppingCart extends AbstractActor {
// Try to fetch latest from a majority of nodes first, since ORMap // Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it. // remove must have seen the item to be able to remove it.
Optional<Object> ctx = Optional.of(rm); Optional<Object> ctx = Optional.of(rm);
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, readMajority, ctx), replicator.tell(new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx),
self()); self());
} }
private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<LineItem>> g) { private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
RemoveItem rm = (RemoveItem) g.getRequest().get(); RemoveItem rm = (RemoveItem) g.getRequest().get();
removeItem(rm.productId); removeItem(rm.productId);
} }
private void receiveRemoveItemGetFailure(GetFailure<LWWMap<LineItem>> f) { private void receiveRemoveItemGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
// ReadMajority failed, fall back to best effort local value // ReadMajority failed, fall back to best effort local value
RemoveItem rm = (RemoveItem) f.getRequest().get(); RemoveItem rm = (RemoveItem) f.getRequest().get();
removeItem(rm.productId); removeItem(rm.productId);
} }
private void removeItem(String productId) { private void removeItem(String productId) {
Update<LWWMap<LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority, Update<LWWMap<String, LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority,
cart -> cart.remove(node, productId)); cart -> cart.remove(node, productId));
replicator.tell(update, self()); replicator.tell(update, self());
} }

View file

@ -47,7 +47,7 @@ public class VotingService extends AbstractActor {
private final Key<Flag> openedKey = FlagKey.create("contestOpened"); private final Key<Flag> openedKey = FlagKey.create("contestOpened");
private final Key<Flag> closedKey = FlagKey.create("contestClosed"); private final Key<Flag> closedKey = FlagKey.create("contestClosed");
private final Key<PNCounterMap> countersKey = PNCounterMapKey.create("contestCounters"); private final Key<PNCounterMap<String>> countersKey = PNCounterMapKey.create("contestCounters");
private final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS)); private final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS));
private final ReadConsistency readAll = new ReadAll(Duration.create(3, SECONDS)); private final ReadConsistency readAll = new ReadAll(Duration.create(3, SECONDS));
@ -96,7 +96,7 @@ public class VotingService extends AbstractActor {
} }
private void receiveVote(Vote vote) { private void receiveVote(Vote vote) {
Update<PNCounterMap> update = new Update<>(countersKey, PNCounterMap.create(), Replicator.writeLocal(), Update<PNCounterMap<String>> update = new Update<>(countersKey, PNCounterMap.create(), Replicator.writeLocal(),
curr -> curr.increment(node, vote.participant, 1)); curr -> curr.increment(node, vote.participant, 1));
replicator.tell(update, self()); replicator.tell(update, self());
} }
@ -119,26 +119,26 @@ public class VotingService extends AbstractActor {
private PartialFunction<Object, BoxedUnit> matchGetVotes(boolean open) { private PartialFunction<Object, BoxedUnit> matchGetVotes(boolean open) {
return ReceiveBuilder return ReceiveBuilder
.matchEquals(GET_VOTES, s -> receiveGetVotes()) .matchEquals(GET_VOTES, s -> receiveGetVotes())
.match(NotFound.class, n -> n.key().equals(countersKey), n -> receiveNotFound(open, (NotFound<PNCounterMap>) n)) .match(NotFound.class, n -> n.key().equals(countersKey), n -> receiveNotFound(open, (NotFound<PNCounterMap<String>>) n))
.match(GetSuccess.class, g -> g.key().equals(countersKey), .match(GetSuccess.class, g -> g.key().equals(countersKey),
g -> receiveGetSuccess(open, (GetSuccess<PNCounterMap>) g)) g -> receiveGetSuccess(open, (GetSuccess<PNCounterMap<String>>) g))
.match(GetFailure.class, f -> f.key().equals(countersKey), f -> receiveGetFailure()) .match(GetFailure.class, f -> f.key().equals(countersKey), f -> receiveGetFailure())
.match(UpdateSuccess.class, u -> receiveUpdateSuccess()).build(); .match(UpdateSuccess.class, u -> receiveUpdateSuccess()).build();
} }
private void receiveGetVotes() { private void receiveGetVotes() {
Optional<Object> ctx = Optional.of(sender()); Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<PNCounterMap>(countersKey, readAll, ctx), self()); replicator.tell(new Replicator.Get<PNCounterMap<String>>(countersKey, readAll, ctx), self());
} }
private void receiveGetSuccess(boolean open, GetSuccess<PNCounterMap> g) { private void receiveGetSuccess(boolean open, GetSuccess<PNCounterMap<String>> g) {
Map<String, BigInteger> result = g.dataValue().getEntries(); Map<String, BigInteger> result = g.dataValue().getEntries();
ActorRef replyTo = (ActorRef) g.getRequest().get(); ActorRef replyTo = (ActorRef) g.getRequest().get();
replyTo.tell(new Votes(result, open), self()); replyTo.tell(new Votes(result, open), self());
} }
private void receiveNotFound(boolean open, NotFound<PNCounterMap> n) { private void receiveNotFound(boolean open, NotFound<PNCounterMap<String>> n) {
ActorRef replyTo = (ActorRef) n.getRequest().get(); ActorRef replyTo = (ActorRef) n.getRequest().get();
replyTo.tell(new Votes(new HashMap<>(), open), self()); replyTo.tell(new Votes(new HashMap<>(), open), self());
} }

View file

@ -28,7 +28,7 @@ class ReplicatedCache extends Actor {
val replicator = DistributedData(context.system).replicator val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system) implicit val cluster = Cluster(context.system)
def dataKey(entryKey: String): LWWMapKey[Any] = def dataKey(entryKey: String): LWWMapKey[String, Any] =
LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100) LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100)
def receive = { def receive = {
@ -39,12 +39,11 @@ class ReplicatedCache extends Actor {
case GetFromCache(key) => case GetFromCache(key) =>
replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender()))) replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender())))
case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) => case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) =>
g.dataValue match { g.get(dataKey(key)).get(key) match {
case data: LWWMap[_] => data.get(key) match {
case Some(value) => replyTo ! Cached(key, Some(value)) case Some(value) => replyTo ! Cached(key, Some(value))
case None => replyTo ! Cached(key, None) case None => replyTo ! Cached(key, None)
} }
}
case NotFound(_, Some(Request(key, replyTo))) => case NotFound(_, Some(Request(key, replyTo))) =>
replyTo ! Cached(key, None) replyTo ! Cached(key, None)
case _: UpdateResponse[_] => // ok case _: UpdateResponse[_] => // ok

View file

@ -53,8 +53,8 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite
self, Cleanup)(context.dispatcher) self, Cleanup)(context.dispatcher)
val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
val UsedHeapKey = LWWMapKey[Long]("usedHeap") val UsedHeapKey = LWWMapKey[String, Long]("usedHeap")
val MaxHeapKey = LWWMapKey[Long]("maxHeap") val MaxHeapKey = LWWMapKey[String, Long]("maxHeap")
replicator ! Subscribe(UsedHeapKey, self) replicator ! Subscribe(UsedHeapKey, self)
replicator ! Subscribe(MaxHeapKey, self) replicator ! Subscribe(MaxHeapKey, self)
@ -75,8 +75,8 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite
val heap = memoryMBean.getHeapMemoryUsage val heap = memoryMBean.getHeapMemoryUsage
val used = heap.getUsed val used = heap.getUsed
val max = heap.getMax val max = heap.getMax
replicator ! Update(UsedHeapKey, LWWMap.empty[Long], WriteLocal)(_ + (node -> used)) replicator ! Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal)(_ + (node -> used))
replicator ! Update(MaxHeapKey, LWWMap.empty[Long], WriteLocal) { data => replicator ! Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal) { data =>
data.get(node) match { data.get(node) match {
case Some(`max`) => data // unchanged case Some(`max`) => data // unchanged
case _ => data + (node -> max) case _ => data + (node -> max)
@ -105,11 +105,11 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite
context.stop(self) context.stop(self)
case Cleanup => case Cleanup =>
def cleanupRemoved(data: LWWMap[Long]): LWWMap[Long] = def cleanupRemoved(data: LWWMap[String, Long]): LWWMap[String, Long] =
(data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) => d - key } (data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) => d - key }
replicator ! Update(UsedHeapKey, LWWMap.empty[Long], WriteLocal)(cleanupRemoved) replicator ! Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal)(cleanupRemoved)
replicator ! Update(MaxHeapKey, LWWMap.empty[Long], WriteLocal)(cleanupRemoved) replicator ! Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal)(cleanupRemoved)
} }
} }

View file

@ -36,7 +36,7 @@ class ShoppingCart(userId: String) extends Actor {
val replicator = DistributedData(context.system).replicator val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system) implicit val cluster = Cluster(context.system)
val DataKey = LWWMapKey[LineItem]("cart-" + userId) val DataKey = LWWMapKey[String, LineItem]("cart-" + userId)
def receive = receiveGetCart def receive = receiveGetCart
.orElse[Any, Unit](receiveAddItem) .orElse[Any, Unit](receiveAddItem)
@ -65,14 +65,14 @@ class ShoppingCart(userId: String) extends Actor {
//#add-item //#add-item
def receiveAddItem: Receive = { def receiveAddItem: Receive = {
case cmd @ AddItem(item) => case cmd @ AddItem(item) =>
val update = Update(DataKey, LWWMap.empty[LineItem], writeMajority, Some(cmd)) { val update = Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, Some(cmd)) {
cart => updateCart(cart, item) cart => updateCart(cart, item)
} }
replicator ! update replicator ! update
} }
//#add-item //#add-item
def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] = def updateCart(data: LWWMap[String, LineItem], item: LineItem): LWWMap[String, LineItem] =
data.get(item.productId) match { data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity)) => case Some(LineItem(_, _, existingQuantity)) =>
data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity)) data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity))

View file

@ -30,7 +30,7 @@ class VotingService extends Actor {
implicit val cluster = Cluster(context.system) implicit val cluster = Cluster(context.system)
val OpenedKey = FlagKey("contestOpened") val OpenedKey = FlagKey("contestOpened")
val ClosedKey = FlagKey("contestClosed") val ClosedKey = FlagKey("contestClosed")
val CountersKey = PNCounterMapKey("contestCounters") val CountersKey = PNCounterMapKey[String]("contestCounters")
replicator ! Subscribe(OpenedKey, self) replicator ! Subscribe(OpenedKey, self)
@ -54,7 +54,7 @@ class VotingService extends Actor {
def open: Receive = { def open: Receive = {
case v @ Vote(participant) => case v @ Vote(participant) =>
val update = Update(CountersKey, PNCounterMap(), WriteLocal, request = Some(v)) { val update = Update(CountersKey, PNCounterMap[String](), WriteLocal, request = Some(v)) {
_.increment(participant, 1) _.increment(participant, 1)
} }
replicator ! update replicator ! update

View file

@ -39,7 +39,7 @@ private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Ac
} }
override def unhandled(msg: Any): Unit = msg match { override def unhandled(msg: Any): Unit = msg match {
case Terminated(ref) throw new a.DeathPactException(toUntyped(ref)) case Terminated(ref) throw a.DeathPactException(toUntyped(ref))
case other super.unhandled(other) case other super.unhandled(other)
} }

View file

@ -156,7 +156,15 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.props") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadAggregator.props"),
// #22035 Make it possible to use anything as the key in a map
FilterAnyProblemStartingWith("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages"),
FilterAnyProblemStartingWith("akka.cluster.ddata.ORMap"),
FilterAnyProblemStartingWith("akka.cluster.ddata.LWWMap"),
FilterAnyProblemStartingWith("akka.cluster.ddata.PNCounterMap"),
FilterAnyProblemStartingWith("akka.cluster.ddata.ORMultiMap")
) )
Map( Map(
@ -690,6 +698,7 @@ object MiMa extends AutoPlugin {
// #21894 Programmatic configuration of the ActorSystem // #21894 Programmatic configuration of the ActorSystem
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystemImpl.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystemImpl.this")
) ++ bcIssuesBetween24and25) ) ++ bcIssuesBetween24and25)
// Entries should be added to a section keyed with the latest released version before the change // Entries should be added to a section keyed with the latest released version before the change
) )