Merge branch 'master' into wip-nostart
This commit is contained in:
commit
6114df1efd
9 changed files with 186 additions and 169 deletions
|
|
@ -289,15 +289,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
|
||||
"firstCompletedOf" in {
|
||||
val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5))
|
||||
Futures.firstCompletedOf(futures).get must be(5)
|
||||
Future.firstCompletedOf(futures).get must be(5)
|
||||
}
|
||||
|
||||
"find" in {
|
||||
val futures = for (i ← 1 to 10) yield Future { i }
|
||||
val result = Futures.find[Int](_ == 3)(futures)
|
||||
val result = Future.find[Int](_ == 3)(futures)
|
||||
result.get must be(Some(3))
|
||||
|
||||
val notFound = Futures.find[Int](_ == 11)(futures)
|
||||
val notFound = Future.find[Int](_ == 11)(futures)
|
||||
notFound.get must be(None)
|
||||
}
|
||||
|
||||
|
|
@ -309,7 +309,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
||||
Futures.fold(0, timeout)(futures)(_ + _).get must be(45)
|
||||
Future.fold(0, timeout)(futures)(_ + _).get must be(45)
|
||||
}
|
||||
|
||||
"fold by composing" in {
|
||||
|
|
@ -336,7 +336,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
||||
Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
|
||||
Future.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -344,7 +344,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
def test(testNumber: Int) {
|
||||
val fs = (0 to 1000) map (i ⇒ Future(i, 10000))
|
||||
val result = Futures.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) {
|
||||
val result = Future.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) {
|
||||
case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef]
|
||||
case (l, _) ⇒ l
|
||||
}.get.asInstanceOf[ArrayBuffer[Int]].sum
|
||||
|
|
@ -356,7 +356,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
}
|
||||
|
||||
"return zero value if folding empty list" in {
|
||||
Futures.fold(0)(List[Future[Int]]())(_ + _).get must be(0)
|
||||
Future.fold(0)(List[Future[Int]]())(_ + _).get must be(0)
|
||||
}
|
||||
|
||||
"shouldReduceResults" in {
|
||||
|
|
@ -367,7 +367,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
||||
assert(Futures.reduce(futures, timeout)(_ + _).get === 45)
|
||||
assert(Future.reduce(futures, timeout)(_ + _).get === 45)
|
||||
}
|
||||
|
||||
"shouldReduceResultsWithException" in {
|
||||
|
|
@ -384,13 +384,13 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
|
|||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
||||
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
||||
assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
||||
}
|
||||
}
|
||||
|
||||
"shouldReduceThrowIAEOnEmptyInput" in {
|
||||
filterException[IllegalArgumentException] {
|
||||
intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get }
|
||||
intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -64,45 +64,13 @@ object Futures {
|
|||
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
|
||||
Future(body.call)(dispatcher, timeout)
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
|
||||
val futureResult = new DefaultPromise[T](timeout)
|
||||
|
||||
val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
|
||||
futures.foreach(_ onComplete completeFirst)
|
||||
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
|
||||
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
|
||||
else {
|
||||
val result = new DefaultPromise[Option[T]](timeout)
|
||||
val ref = new AtomicInteger(futures.size)
|
||||
val search: Future[T] ⇒ Unit = f ⇒ try {
|
||||
f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
|
||||
} finally {
|
||||
if (ref.decrementAndGet == 0)
|
||||
result completeWithResult None
|
||||
}
|
||||
futures.foreach(_ onComplete search)
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
|
||||
val pred: T ⇒ Boolean = predicate.apply(_)
|
||||
find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
|
||||
Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -110,59 +78,7 @@ object Futures {
|
|||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
|
||||
firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
|
||||
|
||||
/**
|
||||
* A non-blocking fold over the specified futures.
|
||||
* The fold is performed on the thread where the last future is completed,
|
||||
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
||||
* or the result of the fold.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.fold(0)(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = {
|
||||
if (futures.isEmpty) {
|
||||
new KeptPromise[R](Right(zero))
|
||||
} else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val done = new Switch(false)
|
||||
val allDone = futures.size
|
||||
|
||||
val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
|
||||
f.value.get match {
|
||||
case Right(value) ⇒
|
||||
val added = results add value
|
||||
if (added && results.size == allDone) { //Only one thread can get here
|
||||
if (done.switchOn) {
|
||||
try {
|
||||
val i = results.iterator
|
||||
var currentValue = zero
|
||||
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
|
||||
result completeWithResult currentValue
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
result completeWithException e
|
||||
} finally {
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
case Left(exception) ⇒
|
||||
if (done.switchOn) {
|
||||
result completeWithException exception
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
futures foreach { _ onComplete aggregate }
|
||||
result
|
||||
}
|
||||
}
|
||||
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
|
|
@ -172,50 +88,24 @@ object Futures {
|
|||
* or the result of the fold.
|
||||
*/
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
|
||||
fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
|
||||
Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
|
||||
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
|
||||
|
||||
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun)
|
||||
|
||||
/**
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.reduce(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
f.value.get match {
|
||||
case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case Left(exception) ⇒ result.completeWithException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
*/
|
||||
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
|
||||
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
||||
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
||||
|
||||
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
|
||||
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
|
||||
* Useful for reducing many Futures into a single Future.
|
||||
*/
|
||||
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] =
|
||||
|
|
@ -298,6 +188,116 @@ object Future {
|
|||
def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
||||
sequence(in)(cbf, timeout)
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
|
||||
val futureResult = new DefaultPromise[T](timeout)
|
||||
|
||||
val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
|
||||
futures.foreach(_ onComplete completeFirst)
|
||||
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
||||
*/
|
||||
def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
|
||||
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
|
||||
else {
|
||||
val result = new DefaultPromise[Option[T]](timeout)
|
||||
val ref = new AtomicInteger(futures.size)
|
||||
val search: Future[T] ⇒ Unit = f ⇒ try {
|
||||
f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
|
||||
} finally {
|
||||
if (ref.decrementAndGet == 0)
|
||||
result completeWithResult None
|
||||
}
|
||||
futures.foreach(_ onComplete search)
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A non-blocking fold over the specified futures.
|
||||
* The fold is performed on the thread where the last future is completed,
|
||||
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
||||
* or the result of the fold.
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.fold(0)(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = {
|
||||
if (futures.isEmpty) {
|
||||
new KeptPromise[R](Right(zero))
|
||||
} else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val done = new Switch(false)
|
||||
val allDone = futures.size
|
||||
|
||||
val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
|
||||
f.value.get match {
|
||||
case Right(value) ⇒
|
||||
val added = results add value
|
||||
if (added && results.size == allDone) { //Only one thread can get here
|
||||
if (done.switchOn) {
|
||||
try {
|
||||
val i = results.iterator
|
||||
var currentValue = zero
|
||||
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
|
||||
result completeWithResult currentValue
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
result completeWithException e
|
||||
} finally {
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
case Left(exception) ⇒
|
||||
if (done.switchOn) {
|
||||
result completeWithException exception
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
futures foreach { _ onComplete aggregate }
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
* Example:
|
||||
* <pre>
|
||||
* val result = Futures.reduce(futures)(_ + _).await.result
|
||||
* </pre>
|
||||
*/
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
else {
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
f.value.get match {
|
||||
case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case Left(exception) ⇒ result.completeWithException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
|
||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||
|
|
|
|||
|
|
@ -568,5 +568,5 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|||
*/
|
||||
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
|
||||
|
||||
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results)
|
||||
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class TransactionLog private (
|
|||
def recordEntry(entry: Array[Byte]) {
|
||||
if (isOpen.isOn) {
|
||||
val entryBytes =
|
||||
if (Cluster.shouldCompressData) LZF.compress(entry)
|
||||
if (shouldCompressData) LZF.compress(entry)
|
||||
else entry
|
||||
|
||||
try {
|
||||
|
|
@ -118,7 +118,7 @@ class TransactionLog private (
|
|||
def recordSnapshot(snapshot: Array[Byte]) {
|
||||
if (isOpen.isOn) {
|
||||
val snapshotBytes =
|
||||
if (Cluster.shouldCompressData) LZF.compress(snapshot)
|
||||
if (shouldCompressData) LZF.compress(snapshot)
|
||||
else snapshot
|
||||
|
||||
try {
|
||||
|
|
@ -311,7 +311,7 @@ class TransactionLog private (
|
|||
while (enumeration.hasMoreElements) {
|
||||
val bytes = enumeration.nextElement.getEntry
|
||||
val entry =
|
||||
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
|
||||
if (shouldCompressData) LZF.uncompress(bytes)
|
||||
else bytes
|
||||
entries = entries :+ entry
|
||||
}
|
||||
|
|
@ -356,6 +356,10 @@ class TransactionLog private (
|
|||
*/
|
||||
object TransactionLog {
|
||||
|
||||
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
|
||||
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
|
||||
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
|
||||
|
||||
val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match {
|
||||
case "CRC32" ⇒ BookKeeper.DigestType.CRC32
|
||||
case "MAC" ⇒ BookKeeper.DigestType.MAC
|
||||
|
|
@ -367,40 +371,17 @@ object TransactionLog {
|
|||
val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
|
||||
val snapshotFrequency = config.getInt("akka.cluster.replication.snapshot-frequency", 1000)
|
||||
val timeout = Duration(config.getInt("akka.cluster.replication.timeout", 30), TIME_UNIT).toMillis
|
||||
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
|
||||
|
||||
private[akka] val transactionLogNode = "/transaction-log-ids"
|
||||
|
||||
private val isConnected = new Switch(false)
|
||||
|
||||
private[akka] lazy val (bookieClient, zkClient) = {
|
||||
val bk = new BookKeeper(Cluster.zooKeeperServers)
|
||||
@volatile
|
||||
private[akka] var bookieClient: BookKeeper = _
|
||||
|
||||
val zk = new AkkaZkClient(
|
||||
Cluster.zooKeeperServers,
|
||||
Cluster.sessionTimeout,
|
||||
Cluster.connectionTimeout,
|
||||
Cluster.defaultZooKeeperSerializer)
|
||||
|
||||
try {
|
||||
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
|
||||
} catch {
|
||||
case e: ZkNodeExistsException ⇒ {} // do nothing
|
||||
case e: Throwable ⇒ handleError(e)
|
||||
}
|
||||
|
||||
EventHandler.info(this,
|
||||
("Transaction log service started with" +
|
||||
"\n\tdigest type [%s]" +
|
||||
"\n\tensemble size [%s]" +
|
||||
"\n\tquorum size [%s]" +
|
||||
"\n\tlogging time out [%s]").format(
|
||||
digestType,
|
||||
ensembleSize,
|
||||
quorumSize,
|
||||
timeout))
|
||||
isConnected.switchOn
|
||||
(bk, zk)
|
||||
}
|
||||
@volatile
|
||||
private[akka] var zkClient: AkkaZkClient = _
|
||||
|
||||
private[akka] def apply(
|
||||
ledger: LedgerHandle,
|
||||
|
|
@ -409,6 +390,34 @@ object TransactionLog {
|
|||
replicationScheme: ReplicationScheme) =
|
||||
new TransactionLog(ledger, id, isAsync, replicationScheme)
|
||||
|
||||
/**
|
||||
* Starts up the transaction log.
|
||||
*/
|
||||
def start(): Unit = {
|
||||
isConnected switchOn {
|
||||
bookieClient = new BookKeeper(zooKeeperServers)
|
||||
zkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout)
|
||||
|
||||
try {
|
||||
zkClient.create(transactionLogNode, null, CreateMode.PERSISTENT)
|
||||
} catch {
|
||||
case e: ZkNodeExistsException ⇒ {} // do nothing
|
||||
case e: Throwable ⇒ handleError(e)
|
||||
}
|
||||
|
||||
EventHandler.info(this,
|
||||
("Transaction log service started with" +
|
||||
"\n\tdigest type [%s]" +
|
||||
"\n\tensemble size [%s]" +
|
||||
"\n\tquorum size [%s]" +
|
||||
"\n\tlogging time out [%s]").format(
|
||||
digestType,
|
||||
ensembleSize,
|
||||
quorumSize,
|
||||
timeout))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the transaction log.
|
||||
*/
|
||||
|
|
@ -575,10 +584,12 @@ object LocalBookKeeperEnsemble {
|
|||
*/
|
||||
def start() {
|
||||
isRunning switchOn {
|
||||
EventHandler.info(this, "Starting up LocalBookKeeperEnsemble...")
|
||||
localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize)
|
||||
localBookKeeper.runZookeeper(port)
|
||||
localBookKeeper.initializeZookeper()
|
||||
localBookKeeper.runBookies()
|
||||
EventHandler.info(this, "LocalBookKeeperEnsemble started up successfully")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.{ EventFilter, TestEvent }
|
||||
|
||||
import com.eaio.uuid.UUID
|
||||
|
||||
|
|
@ -49,8 +51,10 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
|
|||
}
|
||||
|
||||
"fail to be opened if non existing - asynchronous" in {
|
||||
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
|
||||
val uuid = (new UUID).toString
|
||||
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
|
||||
EventHandler.notify(TestEvent.UnMuteAll)
|
||||
}
|
||||
|
||||
"be able to overweite an existing txlog if one already exists - asynchronous" in {
|
||||
|
|
@ -67,6 +71,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
|
|||
}
|
||||
|
||||
"be able to record and delete entries - asynchronous" in {
|
||||
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -78,6 +83,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
|
|||
txlog1.delete
|
||||
Thread.sleep(200)
|
||||
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
|
||||
EventHandler.notify(TestEvent.UnMuteAll)
|
||||
}
|
||||
|
||||
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
|
||||
|
|
@ -214,14 +220,11 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
|
|||
|
||||
override def beforeAll() = {
|
||||
LocalBookKeeperEnsemble.start()
|
||||
TransactionLog.start()
|
||||
}
|
||||
|
||||
override def afterAll() = {
|
||||
Cluster.node.shutdown()
|
||||
LocalCluster.shutdownLocalCluster()
|
||||
TransactionLog.shutdown()
|
||||
LocalBookKeeperEnsemble.shutdown()
|
||||
Actor.registry.local.shutdownAll()
|
||||
Scheduler.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.{ EventFilter, TestEvent }
|
||||
|
||||
import com.eaio.uuid.UUID
|
||||
|
||||
|
|
@ -33,8 +35,10 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo
|
|||
}
|
||||
|
||||
"fail to be opened if non existing - synchronous" in {
|
||||
EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException]))
|
||||
val uuid = (new UUID).toString
|
||||
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
|
||||
EventHandler.notify(TestEvent.UnMuteAll)
|
||||
}
|
||||
|
||||
"be able to be checked for existence - synchronous" in {
|
||||
|
|
@ -175,16 +179,12 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo
|
|||
}
|
||||
|
||||
override def beforeAll() = {
|
||||
LocalCluster.startLocalCluster()
|
||||
LocalBookKeeperEnsemble.start()
|
||||
TransactionLog.start()
|
||||
}
|
||||
|
||||
override def afterAll() = {
|
||||
Cluster.node.shutdown()
|
||||
LocalCluster.shutdownLocalCluster()
|
||||
TransactionLog.shutdown()
|
||||
LocalBookKeeperEnsemble.shutdown()
|
||||
Actor.registry.local.shutdownAll()
|
||||
Scheduler.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc
|
|||
|
||||
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
|
||||
|
||||
val futureSum = Futures.fold(0)(futures)(_ + _)
|
||||
val futureSum = Future.fold(0)(futures)(_ + _)
|
||||
|
||||
That's all it takes!
|
||||
|
||||
|
|
@ -210,7 +210,7 @@ If the sequence passed to ``fold`` is empty, it will return the start-value, in
|
|||
|
||||
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
|
||||
|
||||
val futureSum = Futures.reduce(futures)(_ + _)
|
||||
val futureSum = Future.reduce(futures)(_ + _)
|
||||
|
||||
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
|
||||
|
||||
|
|
|
|||
|
|
@ -232,10 +232,13 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
|||
EventHandler.notify(TestEvent.Mute(filter))
|
||||
val log = TestActorRef[Logger]
|
||||
EventHandler.addListener(log)
|
||||
val eventHandlerLevel = EventHandler.level
|
||||
EventHandler.level = EventHandler.WarningLevel
|
||||
boss link ref
|
||||
val la = log.underlyingActor
|
||||
la.count must be(1)
|
||||
la.msg must (include("supervisor") and include("CallingThreadDispatcher"))
|
||||
EventHandler.level = eventHandlerLevel
|
||||
EventHandler.removeListener(log)
|
||||
EventHandler.notify(TestEvent.UnMute(filter))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,5 +6,5 @@ include "akka-reference.conf"
|
|||
|
||||
akka {
|
||||
event-handlers = ["akka.testkit.TestEventListener"]
|
||||
event-handler-level = "INFO"
|
||||
event-handler-level = "ERROR"
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue