2011-12-21 16:07:18 +04:00
|
|
|
/**
|
2012-01-23 18:25:43 +01:00
|
|
|
* Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
|
2011-12-21 16:07:18 +04:00
|
|
|
*/
|
|
|
|
|
package akka.remote
|
|
|
|
|
|
|
|
|
|
import org.apache.zookeeper._
|
|
|
|
|
import ZooDefs.Ids
|
|
|
|
|
|
2011-12-22 17:34:20 +04:00
|
|
|
object ZkClient extends Watcher {
|
2011-12-21 16:07:18 +04:00
|
|
|
// Don't forget to close!
|
|
|
|
|
lazy val zk: ZooKeeper = {
|
2011-12-22 17:34:20 +04:00
|
|
|
val remoteNodes = AkkaRemoteSpec.testNodes split ','
|
2011-12-30 17:46:04 +04:00
|
|
|
|
2011-12-21 16:07:18 +04:00
|
|
|
// ZkServers are configured to listen on a specific port.
|
|
|
|
|
val connectString = remoteNodes map (_+":2181") mkString ","
|
|
|
|
|
new ZooKeeper(connectString, 3000, this)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def process(ev: WatchedEvent) {
|
|
|
|
|
synchronized { notify() }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ZkBarrier(name: String, count: Int, root: String) extends Barrier {
|
2011-12-30 17:46:04 +04:00
|
|
|
@annotation.tailrec
|
|
|
|
|
private def waitForServer() {
|
|
|
|
|
// SI-1672
|
|
|
|
|
val r = try {
|
|
|
|
|
zk.exists("/", false); true
|
|
|
|
|
} catch {
|
|
|
|
|
case _: KeeperException.ConnectionLossException =>
|
|
|
|
|
Thread.sleep(10000)
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
if (!r) waitForServer()
|
|
|
|
|
}
|
|
|
|
|
waitForServer()
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
|
|
|
|
} catch {
|
|
|
|
|
case _: KeeperException.NodeExistsException =>
|
2011-12-21 16:07:18 +04:00
|
|
|
}
|
|
|
|
|
|
2011-12-31 11:33:55 +04:00
|
|
|
val timeoutMs = 300*1000
|
2011-12-30 17:46:04 +04:00
|
|
|
|
2011-12-31 11:33:55 +04:00
|
|
|
private def block(num: Int) {
|
|
|
|
|
val start = System.currentTimeMillis
|
2011-12-21 16:07:18 +04:00
|
|
|
while (true) {
|
2011-12-31 11:33:55 +04:00
|
|
|
if (System.currentTimeMillis - start > timeoutMs)
|
|
|
|
|
throw new InterruptedException("Timed out blocking in zk")
|
|
|
|
|
|
2011-12-22 17:34:20 +04:00
|
|
|
ZkClient.this.synchronized {
|
2011-12-30 17:46:04 +04:00
|
|
|
val children = zk.getChildren(root, true)
|
2011-12-31 11:33:55 +04:00
|
|
|
if (children.size < num) {
|
|
|
|
|
ZkClient.this.wait(timeoutMs)
|
2011-12-30 17:46:04 +04:00
|
|
|
} else
|
|
|
|
|
return
|
2011-12-21 16:07:18 +04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-31 11:33:55 +04:00
|
|
|
def enter() {
|
|
|
|
|
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
|
|
|
|
CreateMode.EPHEMERAL)
|
|
|
|
|
|
|
|
|
|
block(count)
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-30 17:46:04 +04:00
|
|
|
final def leave() {
|
2011-12-31 11:33:55 +04:00
|
|
|
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
|
|
|
|
CreateMode.EPHEMERAL)
|
2011-12-30 17:46:04 +04:00
|
|
|
|
2011-12-31 11:33:55 +04:00
|
|
|
block(2*count)
|
2011-12-21 16:07:18 +04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root)
|
|
|
|
|
}
|