Used ZkClient for distributed testing.

This commit is contained in:
Eugene Vigdorchik 2011-12-22 17:34:20 +04:00
parent 440e5a5f69
commit 0c4c84d1dc
4 changed files with 15 additions and 20 deletions

View file

@ -7,7 +7,7 @@ trait AbstractRemoteActorMultiJvmSpec {
def commonConfig: Config
def remotes: Seq[String] = {
val arrayOpt = Option(System.getProperty("test.hosts")).map(_ split ",")
val arrayOpt = Option(AkkaRemoteSpec.testNodes).map(_ split ",")
(arrayOpt getOrElse Array.fill(NrOfNodes)("localhost")).toSeq
}

View file

@ -24,6 +24,8 @@ object AkkaRemoteSpec {
.withFallback(ConfigFactory.defaultReference).resolve(ConfigResolveOptions.defaults)
}
}
val testNodes = System.getProperty("test.hosts")
}
abstract class AkkaRemoteSpec(config: Config)

View file

@ -38,17 +38,12 @@ object MultiJvmSync {
def end(className: String, count: Int) = barrier(EndBarrier, count, className)
def testName(className: String) = {
val i = className.indexOf(TestMarker)
if (i >= 0) className.substring(0, i) else className
}
def nodeName(className: String) = {
val i = className.indexOf(TestMarker)
if (i >= 0) className.substring(i + TestMarker.length) else className
}
def barrier(name: String, count: Int, className: String, timeout: Duration = FileBasedBarrier.DefaultTimeout) = {
new FileBasedBarrier(name, count, testName(className), nodeName(className), timeout).await()
val Array(testName, nodeName) = className split TestMarker
val barrier = if (AkkaRemoteSpec.testNodes eq null)
new FileBasedBarrier(name, count, testName, nodeName, timeout)
else
new ZkClient.ZkBarrier(name + "_" + nodeName, count, testName)
barrier.await()
}
}

View file

@ -7,12 +7,10 @@ import com.typesafe.config.Config
import org.apache.zookeeper._
import ZooDefs.Ids
object ZKClient extends Watcher {
object ZkClient extends Watcher {
// Don't forget to close!
lazy val zk: ZooKeeper = {
val remoteNodes = AkkaRemoteSpec.testConf.getString("akka.test.remote.nodes") split ',' map {
case hostport => hostport.split(":")(0)
}
val remoteNodes = AkkaRemoteSpec.testNodes split ','
// ZkServers are configured to listen on a specific port.
val connectString = remoteNodes map (_+":2181") mkString ","
@ -32,9 +30,9 @@ object ZKClient extends Watcher {
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
while (true) {
ZKClient.this.synchronized {
ZkClient.this.synchronized {
if (zk.getChildren(root, true).size < count) {
ZKClient.this.wait()
ZkClient.this.wait()
}
}
}
@ -43,9 +41,9 @@ object ZKClient extends Watcher {
def leave() {
zk.delete(root + "/" + name, -1)
while (true) {
ZKClient.this.synchronized {
ZkClient.this.synchronized {
if (!zk.getChildren(root, true).isEmpty) {
ZKClient.this.wait()
ZkClient.this.wait()
}
}
}