Use tuple to update ConsistentHash and routees together, see #944

This commit is contained in:
Patrik Nordwall 2012-09-13 20:44:17 +02:00
parent 5a90d7198c
commit 5c20c07199

View file

@ -164,23 +164,25 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor
else virtualNodesFactor else virtualNodesFactor
var consistentHashRoutees = new AtomicReference[IndexedSeq[ActorRef]] // tuple of routees and the ConsistentHash, updated together in updateConsistentHash
@volatile var consistentHash: ConsistentHash[ActorRef] = null val consistentHashRef = new AtomicReference[(IndexedSeq[ActorRef], ConsistentHash[ActorRef])]((null, null))
updateConsistentHash() updateConsistentHash()
// update consistentHash when routees has changed // update consistentHash when routees has changed
// changes to routees are rare and when no changes this is a quick operation // changes to routees are rare and when no changes this is a quick operation
def updateConsistentHash(): ConsistentHash[ActorRef] = { def updateConsistentHash(): ConsistentHash[ActorRef] = {
val currentRoutees = routeeProvider.routees val currentRoutees = routeeProvider.routees
if (currentRoutees ne consistentHashRoutees.get) { val oldConsistentHashTuple = consistentHashRef.get
val oldConsistentHashRoutees = consistentHashRoutees.get val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple
val rehash = currentRoutees != oldConsistentHashRoutees if (currentRoutees ne oldConsistentHashRoutees) {
// when other instance, same content, no need to re-hash, but try to set currentRoutees // when other instance, same content, no need to re-hash, but try to set routees
val consistentHash =
if (currentRoutees == oldConsistentHashRoutees) oldConsistentHash
else ConsistentHash(currentRoutees, vnodes) // re-hash
// ignore, don't update, in case of CAS failure // ignore, don't update, in case of CAS failure
if (consistentHashRoutees.compareAndSet(oldConsistentHashRoutees, currentRoutees) && rehash) consistentHashRef.compareAndSet(oldConsistentHashTuple, (currentRoutees, consistentHash))
consistentHash = ConsistentHash(currentRoutees, vnodes)
}
consistentHash consistentHash
} else oldConsistentHash
} }
def target(hashData: Any): ActorRef = try { def target(hashData: Any): ActorRef = try {