Final fix to zk client, distributed testing for akka works. Happy NY to everyone!

This commit is contained in:
Eugene Vigdorchik 2011-12-31 11:33:55 +04:00
parent 2eb7cf31b5
commit 5d47342d31
2 changed files with 22 additions and 24 deletions

View file

@ -3,11 +3,8 @@
*/
package akka.remote
import com.typesafe.config.Config
import org.apache.zookeeper._
import ZooDefs.Ids
import collection.JavaConversions._
import java.net.InetAddress
object ZkClient extends Watcher {
// Don't forget to close!
@ -45,37 +42,38 @@ object ZkClient extends Watcher {
case _: KeeperException.NodeExistsException =>
}
def enter() {
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
val timeoutMs = 300*1000
private def block(num: Int) {
val start = System.currentTimeMillis
while (true) {
if (System.currentTimeMillis - start > timeoutMs)
throw new InterruptedException("Timed out blocking in zk")
ZkClient.this.synchronized {
val children = zk.getChildren(root, true)
println("Enter, children: " + children.mkString(","))
if (children.size < count) {
println("waiting")
ZkClient.this.wait()
if (children.size < num) {
ZkClient.this.wait(timeoutMs)
} else
return
}
}
}
final def leave() {
zk.delete(root + "/" + name, -1)
def enter() {
println("ZK creating " + root + "/" + name)
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
while (true) {
ZkClient.this.synchronized {
val children = zk.getChildren(root, true)
println("Leave, children: " + children.mkString(","))
if (!children.isEmpty) {
println("waiting")
ZkClient.this.wait()
} else
return
}
}
block(count)
}
final def leave() {
println("ZK leaving " + root + "/" + name)
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
block(2*count)
}
}