Some optimizations for the ConsistentHashingRouter
This commit is contained in:
parent
64a1fb0235
commit
44f4fdb003
2 changed files with 33 additions and 23 deletions
|
|
@ -24,10 +24,9 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) {
|
||||||
if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1")
|
if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1")
|
||||||
|
|
||||||
// sorted hash values of the nodes
|
// sorted hash values of the nodes
|
||||||
private val nodeRing: Array[Int] = {
|
private val (nodeHashRing: Array[Int], nodeRing: Vector[T]) = {
|
||||||
val nodeRing = nodes.keys.toArray
|
val (nhr: IndexedSeq[Int], nr: IndexedSeq[AnyRef]) = nodes.toArray.sortBy(_._1).unzip
|
||||||
Arrays.sort(nodeRing)
|
(nhr.toArray, Vector[T]() ++ nr)
|
||||||
nodeRing
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -62,6 +61,17 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) {
|
||||||
*/
|
*/
|
||||||
def remove(node: T): ConsistentHash[T] = this :- node
|
def remove(node: T): ConsistentHash[T] = this :- node
|
||||||
|
|
||||||
|
// converts the result of Arrays.binarySearch into a index in the nodeRing array
|
||||||
|
// see documentation of Arrays.binarySearch for what it returns
|
||||||
|
private def idx(i: Int): Int = {
|
||||||
|
if (i >= 0) i // exact match
|
||||||
|
else {
|
||||||
|
val j = math.abs(i + 1)
|
||||||
|
if (j >= nodeHashRing.length) 0 // after last, use first
|
||||||
|
else j // next node clockwise
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the node responsible for the data key.
|
* Get the node responsible for the data key.
|
||||||
* Can only be used if nodes exists in the node ring,
|
* Can only be used if nodes exists in the node ring,
|
||||||
|
|
@ -70,18 +80,18 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) {
|
||||||
def nodeFor(key: Array[Byte]): T = {
|
def nodeFor(key: Array[Byte]): T = {
|
||||||
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
|
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
|
||||||
|
|
||||||
// converts the result of Arrays.binarySearch into a index in the nodeRing array
|
nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key))))
|
||||||
// see documentation of Arrays.binarySearch for what it returns
|
|
||||||
def idx(i: Int): Int = {
|
|
||||||
if (i >= 0) i // exact match
|
|
||||||
else {
|
|
||||||
val j = math.abs(i + 1)
|
|
||||||
if (j >= nodeRing.length) 0 // after last, use first
|
|
||||||
else j // next node clockwise
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
val nodeRingIndex = idx(Arrays.binarySearch(nodeRing, hashFor(key)))
|
/**
|
||||||
nodes(nodeRing(nodeRingIndex))
|
* Get the node responsible for the data key.
|
||||||
|
* Can only be used if nodes exists in the node ring,
|
||||||
|
* otherwise throws `IllegalStateException`
|
||||||
|
*/
|
||||||
|
def nodeFor(key: String): T = {
|
||||||
|
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
|
||||||
|
|
||||||
|
nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -111,5 +121,6 @@ object ConsistentHash {
|
||||||
hashFor((node + ":" + vnode).getBytes("UTF-8"))
|
hashFor((node + ":" + vnode).getBytes("UTF-8"))
|
||||||
|
|
||||||
private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes)
|
private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes)
|
||||||
}
|
|
||||||
|
|
||||||
|
private def hashFor(string: String): Int = MurmurHash.stringHash(string)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -264,14 +264,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
def target(hashData: Any): ActorRef = try {
|
def target(hashData: Any): ActorRef = try {
|
||||||
val hash = hashData match {
|
|
||||||
case bytes: Array[Byte] ⇒ bytes
|
|
||||||
case str: String ⇒ str.getBytes("UTF-8")
|
|
||||||
case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get
|
|
||||||
}
|
|
||||||
val currentConsistenHash = updateConsistentHash()
|
val currentConsistenHash = updateConsistentHash()
|
||||||
if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters
|
if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters
|
||||||
else currentConsistenHash.nodeFor(hash)
|
else hashData match {
|
||||||
|
case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes)
|
||||||
|
case str: String ⇒ currentConsistenHash.nodeFor(str)
|
||||||
|
case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get)
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
// serialization failed
|
// serialization failed
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue