Changed line endings from DOS to UNIX
This commit is contained in:
parent
4162372024
commit
575ae92fb9
3 changed files with 125 additions and 125 deletions
|
|
@ -1,28 +1,28 @@
|
|||
package akka.remote
|
||||
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
|
||||
trait AbstractRemoteActorMultiJvmSpec {
|
||||
def NrOfNodes: Int
|
||||
def commonConfig: Config
|
||||
|
||||
private[this] val remotes: IndexedSeq[String] = {
|
||||
val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq)
|
||||
nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost")
|
||||
}
|
||||
|
||||
def akkaSpec(idx: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(idx), 9991+idx)
|
||||
|
||||
def akkaURIs(count: Int): String = {
|
||||
0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString ","
|
||||
}
|
||||
|
||||
val nodeConfigs = ((1 to NrOfNodes).toList zip remotes) map {
|
||||
case (idx, host) =>
|
||||
ConfigFactory.parseString("""
|
||||
akka {
|
||||
remote.netty.hostname="%s"
|
||||
remote.netty.port = "%d"
|
||||
}""".format(host, 9990+idx, idx)) withFallback commonConfig
|
||||
}
|
||||
}
|
||||
package akka.remote
|
||||
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
|
||||
trait AbstractRemoteActorMultiJvmSpec {
|
||||
def NrOfNodes: Int
|
||||
def commonConfig: Config
|
||||
|
||||
private[this] val remotes: IndexedSeq[String] = {
|
||||
val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq)
|
||||
nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost")
|
||||
}
|
||||
|
||||
def akkaSpec(idx: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(idx), 9991+idx)
|
||||
|
||||
def akkaURIs(count: Int): String = {
|
||||
0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString ","
|
||||
}
|
||||
|
||||
val nodeConfigs = ((1 to NrOfNodes).toList zip remotes) map {
|
||||
case (idx, host) =>
|
||||
ConfigFactory.parseString("""
|
||||
akka {
|
||||
remote.netty.hostname="%s"
|
||||
remote.netty.port = "%d"
|
||||
}""".format(host, 9990+idx, idx)) withFallback commonConfig
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote
|
||||
|
||||
trait Barrier {
|
||||
def await() = { enter(); leave() }
|
||||
|
||||
def apply(body: ⇒ Unit) {
|
||||
enter()
|
||||
body
|
||||
leave()
|
||||
}
|
||||
|
||||
def enter(): Unit
|
||||
|
||||
def leave(): Unit
|
||||
}
|
||||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote
|
||||
|
||||
trait Barrier {
|
||||
def await() = { enter(); leave() }
|
||||
|
||||
def apply(body: ⇒ Unit) {
|
||||
enter()
|
||||
body
|
||||
leave()
|
||||
}
|
||||
|
||||
def enter(): Unit
|
||||
|
||||
def leave(): Unit
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,78 +1,78 @@
|
|||
/**
|
||||
* Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
|
||||
*/
|
||||
package akka.remote
|
||||
|
||||
import org.apache.zookeeper._
|
||||
import ZooDefs.Ids
|
||||
|
||||
object ZkClient extends Watcher {
|
||||
// Don't forget to close!
|
||||
lazy val zk: ZooKeeper = {
|
||||
val remoteNodes = AkkaRemoteSpec.testNodes split ','
|
||||
|
||||
// ZkServers are configured to listen on a specific port.
|
||||
val connectString = remoteNodes map (_+":2181") mkString ","
|
||||
new ZooKeeper(connectString, 3000, this)
|
||||
}
|
||||
|
||||
def process(ev: WatchedEvent) {
|
||||
synchronized { notify() }
|
||||
}
|
||||
|
||||
class ZkBarrier(name: String, count: Int, root: String) extends Barrier {
|
||||
@annotation.tailrec
|
||||
private def waitForServer() {
|
||||
// SI-1672
|
||||
val r = try {
|
||||
zk.exists("/", false); true
|
||||
} catch {
|
||||
case _: KeeperException.ConnectionLossException =>
|
||||
Thread.sleep(10000)
|
||||
false
|
||||
}
|
||||
if (!r) waitForServer()
|
||||
}
|
||||
waitForServer()
|
||||
|
||||
try {
|
||||
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
} catch {
|
||||
case _: KeeperException.NodeExistsException =>
|
||||
}
|
||||
|
||||
val timeoutMs = 300*1000
|
||||
|
||||
private def block(num: Int) {
|
||||
val start = System.currentTimeMillis
|
||||
while (true) {
|
||||
if (System.currentTimeMillis - start > timeoutMs)
|
||||
throw new InterruptedException("Timed out blocking in zk")
|
||||
|
||||
ZkClient.this.synchronized {
|
||||
val children = zk.getChildren(root, true)
|
||||
if (children.size < num) {
|
||||
ZkClient.this.wait(timeoutMs)
|
||||
} else
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def enter() {
|
||||
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL)
|
||||
|
||||
block(count)
|
||||
}
|
||||
|
||||
final def leave() {
|
||||
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL)
|
||||
|
||||
block(2*count)
|
||||
}
|
||||
}
|
||||
|
||||
def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root)
|
||||
}
|
||||
/**
|
||||
* Copyright (C) 2011-2012 Typesafe <http://typesafe.com/>
|
||||
*/
|
||||
package akka.remote
|
||||
|
||||
import org.apache.zookeeper._
|
||||
import ZooDefs.Ids
|
||||
|
||||
object ZkClient extends Watcher {
|
||||
// Don't forget to close!
|
||||
lazy val zk: ZooKeeper = {
|
||||
val remoteNodes = AkkaRemoteSpec.testNodes split ','
|
||||
|
||||
// ZkServers are configured to listen on a specific port.
|
||||
val connectString = remoteNodes map (_+":2181") mkString ","
|
||||
new ZooKeeper(connectString, 3000, this)
|
||||
}
|
||||
|
||||
def process(ev: WatchedEvent) {
|
||||
synchronized { notify() }
|
||||
}
|
||||
|
||||
class ZkBarrier(name: String, count: Int, root: String) extends Barrier {
|
||||
@annotation.tailrec
|
||||
private def waitForServer() {
|
||||
// SI-1672
|
||||
val r = try {
|
||||
zk.exists("/", false); true
|
||||
} catch {
|
||||
case _: KeeperException.ConnectionLossException =>
|
||||
Thread.sleep(10000)
|
||||
false
|
||||
}
|
||||
if (!r) waitForServer()
|
||||
}
|
||||
waitForServer()
|
||||
|
||||
try {
|
||||
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
} catch {
|
||||
case _: KeeperException.NodeExistsException =>
|
||||
}
|
||||
|
||||
val timeoutMs = 300*1000
|
||||
|
||||
private def block(num: Int) {
|
||||
val start = System.currentTimeMillis
|
||||
while (true) {
|
||||
if (System.currentTimeMillis - start > timeoutMs)
|
||||
throw new InterruptedException("Timed out blocking in zk")
|
||||
|
||||
ZkClient.this.synchronized {
|
||||
val children = zk.getChildren(root, true)
|
||||
if (children.size < num) {
|
||||
ZkClient.this.wait(timeoutMs)
|
||||
} else
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def enter() {
|
||||
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL)
|
||||
|
||||
block(count)
|
||||
}
|
||||
|
||||
final def leave() {
|
||||
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.EPHEMERAL)
|
||||
|
||||
block(2*count)
|
||||
}
|
||||
}
|
||||
|
||||
def barrier(name: String, count: Int, root: String) = new ZkBarrier(name, count, root)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue