ZK related fixed. No review since there's nothing to review.
This commit is contained in:
parent
17125af571
commit
7b9e9d2fe0
2 changed files with 38 additions and 9 deletions
|
|
@ -6,12 +6,14 @@ package akka.remote
|
|||
import com.typesafe.config.Config
|
||||
import org.apache.zookeeper._
|
||||
import ZooDefs.Ids
|
||||
import collection.JavaConversions._
|
||||
import java.net.InetAddress
|
||||
|
||||
object ZkClient extends Watcher {
|
||||
// Don't forget to close!
|
||||
lazy val zk: ZooKeeper = {
|
||||
val remoteNodes = AkkaRemoteSpec.testNodes split ','
|
||||
|
||||
|
||||
// ZkServers are configured to listen on a specific port.
|
||||
val connectString = remoteNodes map (_+":2181") mkString ","
|
||||
new ZooKeeper(connectString, 3000, this)
|
||||
|
|
@ -22,29 +24,56 @@ object ZkClient extends Watcher {
|
|||
}
|
||||
|
||||
class ZkBarrier(name: String, count: Int, root: String) extends Barrier {
|
||||
if (zk.exists(root, false) eq null) {
|
||||
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
|
||||
@annotation.tailrec
|
||||
private def waitForServer() {
|
||||
// SI-1672
|
||||
val r = try {
|
||||
zk.exists("/", false); true
|
||||
} catch {
|
||||
case _: KeeperException.ConnectionLossException =>
|
||||
println("Server is not ready, sleeping...")
|
||||
Thread.sleep(10000)
|
||||
false
|
||||
}
|
||||
if (!r) waitForServer()
|
||||
}
|
||||
waitForServer()
|
||||
|
||||
try {
|
||||
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
} catch {
|
||||
case _: KeeperException.NodeExistsException =>
|
||||
}
|
||||
|
||||
def enter() {
|
||||
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL)
|
||||
|
||||
while (true) {
|
||||
ZkClient.this.synchronized {
|
||||
if (zk.getChildren(root, true).size < count) {
|
||||
val children = zk.getChildren(root, true)
|
||||
println("Enter, children: " + children.mkString(","))
|
||||
if (children.size < count) {
|
||||
println("waiting")
|
||||
ZkClient.this.wait()
|
||||
}
|
||||
} else
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def leave() {
|
||||
final def leave() {
|
||||
zk.delete(root + "/" + name, -1)
|
||||
|
||||
while (true) {
|
||||
ZkClient.this.synchronized {
|
||||
if (!zk.getChildren(root, true).isEmpty) {
|
||||
val children = zk.getChildren(root, true)
|
||||
println("Leave, children: " + children.mkString(","))
|
||||
if (!children.isEmpty) {
|
||||
println("waiting")
|
||||
ZkClient.this.wait()
|
||||
}
|
||||
} else
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue