From 993947335b258639fa3018abfa73fad97c2f2ab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 Nov 2011 12:02:25 +0100 Subject: [PATCH] Added configuration for seed nodes in RemoteExtension and Gossiper. Also cleaned up reference config from old cluster stuff. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../scala/akka/remote/RemoteInterface.scala | 19 +++++++++---- .../main/resources/akka-remote-reference.conf | 28 +++---------------- .../src/main/scala/akka/remote/Gossiper.scala | 18 +++++++++--- .../scala/akka/remote/RemoteExtension.scala | 4 ++- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 7d33508f46..0dddf27452 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -13,7 +13,6 @@ import java.io.{ PrintWriter, PrintStream } import java.net.InetSocketAddress object RemoteAddress { - def apply(host: String, port: Int): RemoteAddress = apply(new InetSocketAddress(host, port)) def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match { case null ⇒ null case inet ⇒ @@ -22,18 +21,26 @@ object RemoteAddress { case other ⇒ other.getHostAddress } val portNo = inet.getPort - RemoteAddress(portNo, host) + RemoteAddress(host, portNo) + } + + def apply(address: String): RemoteAddress = { + val index = address.indexOf(":") + if (index < 1) throw new IllegalArgumentException( + "Remote address must be a string on the format [\"hostname:port\"], was [" + address + "]") + RemoteAddress( + address.substring(0, index - 1), + address.substring(index, address.length).toInt) } } -object LocalOnly extends RemoteAddress(0, "local") - -case class RemoteAddress private[akka] (port: Int, hostname: String) { +case class RemoteAddress(hostname: String, port: Int) { @transient override lazy val toString = "" + hostname + ":" + port - } +object LocalOnly extends RemoteAddress("local", 0) + class RemoteException(message: String) extends AkkaException(message) trait RemoteModule { diff --git a/akka-remote/src/main/resources/akka-remote-reference.conf b/akka-remote/src/main/resources/akka-remote-reference.conf index 4d31549b73..6d6db6f31c 100644 --- a/akka-remote/src/main/resources/akka-remote-reference.conf +++ b/akka-remote/src/main/resources/akka-remote-reference.conf @@ -10,8 +10,8 @@ akka { remote { # FIXME rename to transport layer = "akka.cluster.netty.NettyRemoteSupport" - - use-compression = off + + use-compression = off secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' # or using 'akka.util.Crypt.generateSecureCookie' @@ -50,28 +50,8 @@ akka { reconnection-time-window = 600s # Maximum time window that a client should try to reconnect for } } - - // TODO cluster config will go into akka-cluster-reference.conf when we enable that module + cluster { - name = "test-cluster" - nodename = "" - zookeeper-server-addresses = "localhost:2181" # comma-separated list of ':' elements - max-time-to-wait-until-connected = 30s - session-timeout = 60s - connection-timeout = 60s - include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor - # Default: on - log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files - - replication { - digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password) - password = "secret" # FIXME: store open in file? - ensemble-size = 3 - quorum-size = 2 - snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot - timeout = 30s # Timeout for asyncronous (write-behind) operations - } + seed-nodes = ["wallace:2552", "gromit:2552"] } - - } diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 3735f6ceaf..243a5f9596 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -11,13 +11,17 @@ import akka.util.duration._ import akka.util.Duration import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ +import akka.config.ConfigurationException +import akka.serialization.SerializationExtension + import java.util.concurrent.atomic.AtomicReference import java.security.SecureRandom import System.{ currentTimeMillis ⇒ newTimestamp } + import scala.collection.immutable.Map import scala.annotation.tailrec + import com.google.protobuf.ByteString -import akka.serialization.SerializationExtension /** * Interface for node membership change listener. @@ -36,9 +40,8 @@ case class Gossip( availableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress], unavailableNodes: Set[RemoteAddress] = Set.empty[RemoteAddress]) +// ====== START - NEW GOSSIP IMPLEMENTATION ====== /* - // ====== NEW GOSSIP IMPLEMENTATION ====== - case class Gossip( version: VectorClock, node: RemoteAddress, @@ -74,6 +77,7 @@ case class Gossip( changes: Vector[VNodeMod], status: PendingPartitioningStatus) */ +// ====== END - NEW GOSSIP IMPLEMENTATION ====== /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live @@ -106,7 +110,13 @@ class Gossiper(remote: Remote) { private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) - private val seeds = Set(address) // FIXME read in list of seeds from config + + private val seeds = { + val seeds = RemoteExtension(system).settings.SeedNodes + if (seeds.isEmpty) throw new ConfigurationException( + "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") + else seeds + } private val address = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress private val nodeFingerprint = address.## diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index e897bcf713..b8e4f89f40 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -17,6 +17,8 @@ import java.net.InetAddress import akka.config.ConfigurationException import com.eaio.uuid.UUID +import scala.collection.JavaConverters._ + object RemoteExtensionKey extends ExtensionKey[RemoteExtension] object RemoteExtension { @@ -43,6 +45,7 @@ object RemoteExtension { // TODO cluster config will go into akka-cluster-reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") + val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_)) val NodeName: String = config.getString("akka.cluster.nodename") match { case "" ⇒ new UUID().toString @@ -65,7 +68,6 @@ object RemoteExtension { } class RemoteServerSettings { - import scala.collection.JavaConverters._ val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size") val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { case "" ⇒ None