diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala index 014afd0c54..2be4c156f2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala @@ -43,7 +43,7 @@ object MultiJvmSync { val barrier = if (AkkaRemoteSpec.testNodes eq null) new FileBasedBarrier(name, count, testName, nodeName, timeout) else - new ZkClient.ZkBarrier(name + "_" + nodeName, count, "/" + testName) + new ZkClient.ZkBarrier(nodeName, count, "/" + testName + "_" + name) barrier.await() } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala index d6ef72ae22..52f5dc3b33 100755 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -3,11 +3,8 @@ */ package akka.remote -import com.typesafe.config.Config import org.apache.zookeeper._ import ZooDefs.Ids -import collection.JavaConversions._ -import java.net.InetAddress object ZkClient extends Watcher { // Don't forget to close! @@ -45,37 +42,38 @@ object ZkClient extends Watcher { case _: KeeperException.NodeExistsException => } - def enter() { - zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL) + val timeoutMs = 300*1000 + private def block(num: Int) { + val start = System.currentTimeMillis while (true) { + if (System.currentTimeMillis - start > timeoutMs) + throw new InterruptedException("Timed out blocking in zk") + ZkClient.this.synchronized { val children = zk.getChildren(root, true) - println("Enter, children: " + children.mkString(",")) - if (children.size < count) { - println("waiting") - ZkClient.this.wait() + if (children.size < num) { + ZkClient.this.wait(timeoutMs) } else return } } } - final def leave() { - zk.delete(root + "/" + name, -1) + def enter() { + println("ZK creating " + root + "/" + name) + zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL) - while (true) { - ZkClient.this.synchronized { - val children = zk.getChildren(root, true) - println("Leave, children: " + children.mkString(",")) - if (!children.isEmpty) { - println("waiting") - ZkClient.this.wait() - } else - return - } - } + block(count) + } + + final def leave() { + println("ZK leaving " + root + "/" + name) + zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL) + + block(2*count) } }