diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala index 3eeab35bd0..014afd0c54 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala @@ -43,7 +43,7 @@ object MultiJvmSync { val barrier = if (AkkaRemoteSpec.testNodes eq null) new FileBasedBarrier(name, count, testName, nodeName, timeout) else - new ZkClient.ZkBarrier(name + "_" + nodeName, count, testName) + new ZkClient.ZkBarrier(name + "_" + nodeName, count, "/" + testName) barrier.await() } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala index 225be41d76..d6ef72ae22 100755 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -6,12 +6,14 @@ package akka.remote import com.typesafe.config.Config import org.apache.zookeeper._ import ZooDefs.Ids +import collection.JavaConversions._ +import java.net.InetAddress object ZkClient extends Watcher { // Don't forget to close! lazy val zk: ZooKeeper = { val remoteNodes = AkkaRemoteSpec.testNodes split ',' - + // ZkServers are configured to listen on a specific port. val connectString = remoteNodes map (_+":2181") mkString "," new ZooKeeper(connectString, 3000, this) @@ -22,29 +24,56 @@ object ZkClient extends Watcher { } class ZkBarrier(name: String, count: Int, root: String) extends Barrier { - if (zk.exists(root, false) eq null) { - zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + @annotation.tailrec + private def waitForServer() { + // SI-1672 + val r = try { + zk.exists("/", false); true + } catch { + case _: KeeperException.ConnectionLossException => + println("Server is not ready, sleeping...") + Thread.sleep(10000) + false + } + if (!r) waitForServer() + } + waitForServer() + + try { + zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + } catch { + case _: KeeperException.NodeExistsException => } def enter() { zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + while (true) { ZkClient.this.synchronized { - if (zk.getChildren(root, true).size < count) { + val children = zk.getChildren(root, true) + println("Enter, children: " + children.mkString(",")) + if (children.size < count) { + println("waiting") ZkClient.this.wait() - } + } else + return } } } - def leave() { + final def leave() { zk.delete(root + "/" + name, -1) + while (true) { ZkClient.this.synchronized { - if (!zk.getChildren(root, true).isEmpty) { + val children = zk.getChildren(root, true) + println("Leave, children: " + children.mkString(",")) + if (!children.isEmpty) { + println("waiting") ZkClient.this.wait() - } + } else + return } } }