Fixed minor stuff in Gossiper after code review feedback.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-10-27 15:14:15 +02:00
parent c8b17b9e92
commit c1152a0b42

View file

@ -14,7 +14,7 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.Random import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp } import System.{ currentTimeMillis newTimestamp }
import scala.collection.immutable.Map import scala.collection.immutable.Map
@ -112,8 +112,8 @@ class Gossiper(remote: Remote) {
private val address = new InetSocketAddress(app.hostname, app.port) private val address = new InetSocketAddress(app.hostname, app.port)
private val nodeFingerprint = address.## private val nodeFingerprint = address.##
private val random = new Random(newTimestamp) private val random = SecureRandom.getInstance("SHA1PRNG")
private val initalDelayForGossip = 5 seconds // FIXME make configurablev private val initalDelayForGossip = 5 seconds // FIXME make configurable
private val gossipFrequency = 1 seconds // FIXME make configurable private val gossipFrequency = 1 seconds // FIXME make configurable
private val timeUnit = { private val timeUnit = {
assert(gossipFrequency.unit == initalDelayForGossip.unit) assert(gossipFrequency.unit == initalDelayForGossip.unit)
@ -216,14 +216,12 @@ class Gossiper(remote: Remote) {
if (random.nextDouble() < probability) gossipTo(oldUnavailableNodes) if (random.nextDouble() < probability) gossipTo(oldUnavailableNodes)
} }
if (!gossipedToSeed || oldAvailableNodesSize < 1) { // 3. gossip to a seed for facilitating partition healing
// 3. gossip to a seed for facilitating partition healing if ((!gossipedToSeed || oldAvailableNodesSize < 1) && (seeds.head != address)) {
if (seeds.head != address) { if (oldAvailableNodesSize == 0) gossipTo(seeds)
if (oldAvailableNodesSize == 0) gossipTo(seeds) else {
else { val probability = 1.0 / oldAvailableNodesSize + oldUnavailableNodesSize
val probability = 1.0 / oldAvailableNodesSize + oldUnavailableNodesSize if (random.nextDouble() <= probability) gossipTo(seeds)
if (random.nextDouble() <= probability) gossipTo(seeds)
}
} }
} }
} }
@ -247,17 +245,14 @@ class Gossiper(remote: Remote) {
case Some(Failure(cause)) case Some(Failure(cause))
app.eventHandler.error(cause, this, cause.toString) app.eventHandler.error(cause, this, cause.toString)
throw cause
case None case None
val error = new RemoteException("Gossip to [%s] timed out".format(connection.address)) val error = new RemoteException("Gossip to [%s] timed out".format(connection.address))
app.eventHandler.error(error, this, error.toString) app.eventHandler.error(error, this, error.toString)
throw error
} }
} catch { } catch {
case e: Exception case e: Exception
app.eventHandler.error(e, this, "Could not gossip to [%s] due to: %s".format(connection.address, e.toString)) app.eventHandler.error(e, this, "Could not gossip to [%s] due to: %s".format(connection.address, e.toString))
throw e
} }
seeds exists (peer == _) seeds exists (peer == _)
@ -274,7 +269,7 @@ class Gossiper(remote: Remote) {
val oldAvailableNodes = oldGossip.availableNodes val oldAvailableNodes = oldGossip.availableNodes
val oldUnavailableNodes = oldGossip.unavailableNodes val oldUnavailableNodes = oldGossip.unavailableNodes
val newlyDetectedUnavailableNodes = oldAvailableNodes filter (!failureDetector.isAvailable(_)) val newlyDetectedUnavailableNodes = oldAvailableNodes filterNot failureDetector.isAvailable
if (!newlyDetectedUnavailableNodes.isEmpty) { // we have newly detected nodes marked as unavailable if (!newlyDetectedUnavailableNodes.isEmpty) { // we have newly detected nodes marked as unavailable
val newAvailableNodes = oldAvailableNodes diff newlyDetectedUnavailableNodes val newAvailableNodes = oldAvailableNodes diff newlyDetectedUnavailableNodes