Rename a few things, see #944
* hashKey * hashMapping ConsistentHashMapping * withHashMapper ConsistentHashMapper
This commit is contained in:
parent
b4f5948340
commit
64a1fb0235
6 changed files with 54 additions and 51 deletions
|
|
@ -12,6 +12,7 @@ import akka.actor.actorRef2Scala
|
|||
import akka.pattern.ask
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashable
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit._
|
||||
|
||||
|
|
@ -62,40 +63,40 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c
|
|||
"select destination based on consistentHashKey of the message" in {
|
||||
router1 ! Msg("a", "A")
|
||||
val destinationA = expectMsgType[ActorRef]
|
||||
router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a")
|
||||
router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
|
||||
expectMsg(destinationA)
|
||||
|
||||
router1 ! Msg(17, "B")
|
||||
val destinationB = expectMsgType[ActorRef]
|
||||
router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17)
|
||||
router1 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
|
||||
expectMsg(destinationB)
|
||||
|
||||
router1 ! Msg(MsgKey("c"), "C")
|
||||
val destinationC = expectMsgType[ActorRef]
|
||||
router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c"))
|
||||
router1 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
|
||||
expectMsg(destinationC)
|
||||
}
|
||||
|
||||
"select destination with defined consistentHashRoute" in {
|
||||
def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = {
|
||||
def hashMapping: ConsistentHashMapping = {
|
||||
case Msg2(key, data) ⇒ key
|
||||
}
|
||||
val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(
|
||||
consistentHashRoute = consistentHashRoute)), "router2")
|
||||
hashMapping = hashMapping)), "router2")
|
||||
|
||||
router2 ! Msg2("a", "A")
|
||||
val destinationA = expectMsgType[ActorRef]
|
||||
router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a")
|
||||
router2 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
|
||||
expectMsg(destinationA)
|
||||
|
||||
router2 ! Msg2(17, "B")
|
||||
val destinationB = expectMsgType[ActorRef]
|
||||
router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17)
|
||||
router2 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
|
||||
expectMsg(destinationB)
|
||||
|
||||
router2 ! Msg2(MsgKey("c"), "C")
|
||||
val destinationC = expectMsgType[ActorRef]
|
||||
router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c"))
|
||||
router2 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
|
||||
expectMsg(destinationC)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ object ConsistentHashingRouter {
|
|||
}
|
||||
|
||||
/**
|
||||
* If you don't define the consistentHashRoute when
|
||||
* If you don't define the `hashMapping` when
|
||||
* constructing the [[akka.routing.ConsistentHashingRouter]]
|
||||
* the messages need to implement this interface to define what
|
||||
* data to use for the consistent hash key. Note that it's not
|
||||
|
|
@ -49,7 +49,7 @@ object ConsistentHashingRouter {
|
|||
}
|
||||
|
||||
/**
|
||||
* If you don't define the consistentHashRoute when
|
||||
* If you don't define the `hashMapping` when
|
||||
* constructing the [[akka.routing.ConsistentHashingRouter]]
|
||||
* and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]]
|
||||
* themselves they can we wrapped by this envelope instead. The
|
||||
|
|
@ -57,8 +57,10 @@ object ConsistentHashingRouter {
|
|||
* i.e. the envelope will be stripped off.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any)
|
||||
extends ConsistentHashable with RouterEnvelope
|
||||
final case class ConsistentHashableEnvelope(message: Any, hashKey: Any)
|
||||
extends ConsistentHashable with RouterEnvelope {
|
||||
override def consistentHashKey: Any = hashKey
|
||||
}
|
||||
|
||||
/**
|
||||
* Partial function from message to the data to
|
||||
|
|
@ -69,12 +71,12 @@ object ConsistentHashingRouter {
|
|||
* otherwise the configured [[akka.akka.serialization.Serializer]]
|
||||
* will be applied to the returned data.
|
||||
*/
|
||||
type ConsistentHashRoute = PartialFunction[Any, Any]
|
||||
type ConsistentHashMapping = PartialFunction[Any, Any]
|
||||
|
||||
@SerialVersionUID(1L)
|
||||
object emptyConsistentHashRoute extends ConsistentHashRoute {
|
||||
object emptyConsistentHashMapping extends ConsistentHashMapping {
|
||||
def isDefinedAt(x: Any) = false
|
||||
def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashRoute apply()")
|
||||
def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashMapping apply()")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -90,8 +92,8 @@ object ConsistentHashingRouter {
|
|||
* otherwise the configured [[akka.akka.serialization.Serializer]]
|
||||
* will be applied to the returned data.
|
||||
*/
|
||||
trait ConsistentHashMapping {
|
||||
def consistentHashKey(message: Any): Any
|
||||
trait ConsistentHashMapper {
|
||||
def hashKey(message: Any): Any
|
||||
}
|
||||
}
|
||||
/**
|
||||
|
|
@ -100,7 +102,7 @@ object ConsistentHashingRouter {
|
|||
*
|
||||
* There is 3 ways to define what data to use for the consistent hash key.
|
||||
*
|
||||
* 1. You can define `consistentHashRoute` / `withConsistentHashMapping`
|
||||
* 1. You can define `hashMapping` / `withHashMapper`
|
||||
* of the router to map incoming messages to their consistent hash key.
|
||||
* This makes the decision transparent for the sender.
|
||||
*
|
||||
|
|
@ -113,7 +115,7 @@ object ConsistentHashingRouter {
|
|||
* the key to use.
|
||||
*
|
||||
* These ways to define the consistent hash key can be use together and at
|
||||
* the same time for one router. The `consistentHashRoute` is tried first.
|
||||
* the same time for one router. The `hashMapping` is tried first.
|
||||
*
|
||||
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
|
||||
* sense as this means that the router should both create new actors and use the 'routees'
|
||||
|
|
@ -137,7 +139,7 @@ object ConsistentHashingRouter {
|
|||
* @param routees string representation of the actor paths of the routees that will be looked up
|
||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||
* @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
|
||||
* @param consistentHashRoute partial function from message to the data to
|
||||
* @param hashMapping partial function from message to the data to
|
||||
* use for the consistent hash key
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
|
|
@ -146,7 +148,7 @@ case class ConsistentHashingRouter(
|
|||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
|
||||
val virtualNodesFactor: Int = 0,
|
||||
val consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = ConsistentHashingRouter.emptyConsistentHashRoute)
|
||||
val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping)
|
||||
extends RouterConfig with ConsistentHashingLike {
|
||||
|
||||
/**
|
||||
|
|
@ -188,10 +190,10 @@ case class ConsistentHashingRouter(
|
|||
/**
|
||||
* Java API for setting the mapping from message to the data to use for the consistent hash key.
|
||||
*/
|
||||
def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = {
|
||||
copy(consistentHashRoute = {
|
||||
case message if (mapping.consistentHashKey(message).asInstanceOf[AnyRef] ne null) ⇒
|
||||
mapping.consistentHashKey(message)
|
||||
def withHashMapper(mapping: ConsistentHashingRouter.ConsistentHashMapper) = {
|
||||
copy(hashMapping = {
|
||||
case message if (mapping.hashKey(message).asInstanceOf[AnyRef] ne null) ⇒
|
||||
mapping.hashKey(message)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -199,7 +201,7 @@ case class ConsistentHashingRouter(
|
|||
* Uses the resizer of the given RouterConfig if this RouterConfig
|
||||
* doesn't have one, i.e. the resizer defined in code is used if
|
||||
* resizer was not defined in config.
|
||||
* Uses the the consistentHashRoute defined in code, since
|
||||
* Uses the the `hashMapping` defined in code, since
|
||||
* that can't be defined in configuration.
|
||||
*/
|
||||
override def withFallback(other: RouterConfig): RouterConfig = other match {
|
||||
|
|
@ -208,7 +210,7 @@ case class ConsistentHashingRouter(
|
|||
val useResizer =
|
||||
if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer
|
||||
else this.resizer
|
||||
copy(resizer = useResizer, consistentHashRoute = otherRouter.consistentHashRoute)
|
||||
copy(resizer = useResizer, hashMapping = otherRouter.hashMapping)
|
||||
case _ ⇒ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other))
|
||||
}
|
||||
}
|
||||
|
|
@ -227,7 +229,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
|
|||
|
||||
def virtualNodesFactor: Int
|
||||
|
||||
def consistentHashRoute: ConsistentHashRoute
|
||||
def hashMapping: ConsistentHashMapping
|
||||
|
||||
override def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||
if (resizer.isEmpty) {
|
||||
|
|
@ -273,7 +275,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
|
|||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
// serialization failed
|
||||
log.warning("Couldn't route message with consistentHashKey [{}] due to [{}]", hashData, e.getMessage)
|
||||
log.warning("Couldn't route message with consistent hash key [{}] due to [{}]", hashData, e.getMessage)
|
||||
routeeProvider.context.system.deadLetters
|
||||
}
|
||||
|
||||
|
|
@ -281,11 +283,11 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
|
|||
case (sender, message) ⇒
|
||||
message match {
|
||||
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
||||
case _ if consistentHashRoute.isDefinedAt(message) ⇒
|
||||
List(Destination(sender, target(consistentHashRoute(message))))
|
||||
case _ if hashMapping.isDefinedAt(message) ⇒
|
||||
List(Destination(sender, target(hashMapping(message))))
|
||||
case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey)))
|
||||
case other ⇒
|
||||
log.warning("Message [{}] must be handled by consistentHashRoute, or implement [{}] or be wrapped in [{}]",
|
||||
log.warning("Message [{}] must be handled by hashMapping, or implement [{}] or be wrapped in [{}]",
|
||||
message.getClass.getName, classOf[ConsistentHashable].getName,
|
||||
classOf[ConsistentHashableEnvelope].getName)
|
||||
List(Destination(sender, routeeProvider.context.system.deadLetters))
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import java.io.Serializable;
|
|||
import akka.actor.Props;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.routing.ConsistentHashingRouter;
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping;
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashMapper;
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
|
||||
//#imports2
|
||||
|
||||
|
|
@ -40,7 +40,7 @@ public class ConsistentHashingRouterDocTestBase {
|
|||
}
|
||||
|
||||
//#cache-actor
|
||||
|
||||
|
||||
public static class Cache extends UntypedActor {
|
||||
Map<String, String> cache = new HashMap<String, String>();
|
||||
|
||||
|
|
@ -62,14 +62,14 @@ public class ConsistentHashingRouterDocTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class Evict implements Serializable {
|
||||
public static final class Evict implements Serializable {
|
||||
public final String key;
|
||||
public Evict(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Get implements Serializable, ConsistentHashable {
|
||||
public static final class Get implements Serializable, ConsistentHashable {
|
||||
public final String key;
|
||||
public Get(String key) {
|
||||
this.key = key;
|
||||
|
|
@ -79,7 +79,7 @@ public class ConsistentHashingRouterDocTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class Entry implements Serializable {
|
||||
public static final class Entry implements Serializable {
|
||||
public final String key;
|
||||
public final String value;
|
||||
public Entry(String key, String value) {
|
||||
|
|
@ -99,9 +99,9 @@ public class ConsistentHashingRouterDocTestBase {
|
|||
|
||||
//#consistent-hashing-router
|
||||
|
||||
final ConsistentHashMapping consistentHashMapping = new ConsistentHashMapping() {
|
||||
final ConsistentHashMapper hashMapper = new ConsistentHashMapper() {
|
||||
@Override
|
||||
public Object consistentHashKey(Object message) {
|
||||
public Object hashKey(Object message) {
|
||||
if (message instanceof Evict) {
|
||||
return ((Evict) message).key;
|
||||
} else {
|
||||
|
|
@ -111,7 +111,7 @@ public class ConsistentHashingRouterDocTestBase {
|
|||
};
|
||||
|
||||
ActorRef cache = system.actorOf(new Props(Cache.class).withRouter(
|
||||
new ConsistentHashingRouter(10).withConsistentHashMapping(consistentHashMapping)),
|
||||
new ConsistentHashingRouter(10).withHashMapper(hashMapper)),
|
||||
"cache");
|
||||
|
||||
cache.tell(new ConsistentHashableEnvelope(
|
||||
|
|
|
|||
|
|
@ -286,7 +286,7 @@ insight into how consistent hashing is implemented.
|
|||
|
||||
There is 3 ways to define what data to use for the consistent hash key.
|
||||
|
||||
* You can define ``withConsistentHashMapping`` of the router to map incoming
|
||||
* You can define ``withHashMapper`` of the router to map incoming
|
||||
messages to their consistent hash key. This makes the the decision
|
||||
transparent for the sender.
|
||||
|
||||
|
|
@ -299,7 +299,7 @@ There is 3 ways to define what data to use for the consistent hash key.
|
|||
the key to use.
|
||||
|
||||
These ways to define the consistent hash key can be use together and at
|
||||
the same time for one router. The ``withConsistentHashMapping`` is tried first.
|
||||
the same time for one router. The ``withHashMapper`` is tried first.
|
||||
|
||||
Code example:
|
||||
|
||||
|
|
@ -311,7 +311,7 @@ Code example:
|
|||
|
||||
In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself,
|
||||
while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict``
|
||||
message is handled by the ``withConsistentHashMapping``.
|
||||
message is handled by the ``withHashMapper``.
|
||||
|
||||
This is an example of how to define a consistent-hashing router in configuration:
|
||||
|
||||
|
|
|
|||
|
|
@ -42,20 +42,20 @@ class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender {
|
|||
//#consistent-hashing-router
|
||||
import akka.actor.Props
|
||||
import akka.routing.ConsistentHashingRouter
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashRoute
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
|
||||
|
||||
def consistentHashRoute: ConsistentHashRoute = {
|
||||
def hashMapping: ConsistentHashMapping = {
|
||||
case Evict(key) ⇒ key
|
||||
}
|
||||
|
||||
val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10,
|
||||
consistentHashRoute = consistentHashRoute)), name = "cache")
|
||||
hashMapping = hashMapping)), name = "cache")
|
||||
|
||||
cache ! ConsistentHashableEnvelope(
|
||||
message = Entry("hello", "HELLO"), consistentHashKey = "hello")
|
||||
message = Entry("hello", "HELLO"), hashKey = "hello")
|
||||
cache ! ConsistentHashableEnvelope(
|
||||
message = Entry("hi", "HI"), consistentHashKey = "hi")
|
||||
message = Entry("hi", "HI"), hashKey = "hi")
|
||||
|
||||
cache ! Get("hello")
|
||||
expectMsg(Some("HELLO"))
|
||||
|
|
|
|||
|
|
@ -291,7 +291,7 @@ insight into how consistent hashing is implemented.
|
|||
|
||||
There is 3 ways to define what data to use for the consistent hash key.
|
||||
|
||||
* You can define ``consistentHashRoute`` of the router to map incoming
|
||||
* You can define ``hashMapping`` of the router to map incoming
|
||||
messages to their consistent hash key. This makes the decision
|
||||
transparent for the sender.
|
||||
|
||||
|
|
@ -304,7 +304,7 @@ There is 3 ways to define what data to use for the consistent hash key.
|
|||
the key to use.
|
||||
|
||||
These ways to define the consistent hash key can be use together and at
|
||||
the same time for one router. The ``consistentHashRoute`` is tried first.
|
||||
the same time for one router. The ``hashMapping`` is tried first.
|
||||
|
||||
Code example:
|
||||
|
||||
|
|
@ -314,7 +314,7 @@ Code example:
|
|||
|
||||
In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself,
|
||||
while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict``
|
||||
message is handled by the ``consistentHashRoute`` partial function.
|
||||
message is handled by the ``hashMapping`` partial function.
|
||||
|
||||
This is an example of how to define a consistent-hashing router in configuration:
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue