Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
0f640fb4f9
19 changed files with 188 additions and 53 deletions
|
|
@ -57,7 +57,7 @@ private[camel] object TypedConsumerPublisher {
|
|||
*/
|
||||
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event))
|
||||
EventHandler notifyListeners EventHandler.Info(this, "published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
EventHandler.info(this, "published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -65,7 +65,7 @@ private[camel] object TypedConsumerPublisher {
|
|||
*/
|
||||
def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
|
||||
CamelContextManager.mandatoryContext.stopRoute(event.methodUuid)
|
||||
EventHandler notifyListeners EventHandler.Info(this, "unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
EventHandler.info(this, "unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ trait CamelContextLifecycle {
|
|||
c.start
|
||||
t.start
|
||||
_started = true
|
||||
EventHandler notifyListeners EventHandler.Info(this, "Camel context started")
|
||||
EventHandler.info(this, "Camel context started")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -118,7 +118,7 @@ trait CamelContextLifecycle {
|
|||
c.stop
|
||||
_started = false
|
||||
_initialized = false
|
||||
EventHandler notifyListeners EventHandler.Info(this, "Camel context stopped")
|
||||
EventHandler.info(this, "Camel context stopped")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -141,7 +141,7 @@ trait CamelContextLifecycle {
|
|||
this._template = Some(context.createProducerTemplate)
|
||||
|
||||
_initialized = true
|
||||
EventHandler notifyListeners EventHandler.Info(this, "Camel context initialized")
|
||||
EventHandler.info(this, "Camel context initialized")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
11
akka-camel/src/test/resources/logback.xml
Normal file
11
akka-camel/src/test/resources/logback.xml
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="false" debug="false">
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="OFF">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
@ -29,14 +29,15 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
private var localBookKeeper: LocalBookKeeper = _
|
||||
|
||||
"A synchronous used Transaction Log" should {
|
||||
"be able to record entries" in {
|
||||
|
||||
"be able to record entries - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog = TransactionLog.newLogFor(uuid, false, null)
|
||||
val entry = "hello".getBytes("UTF-8")
|
||||
txlog.recordEntry(entry)
|
||||
}
|
||||
|
||||
"be able to record and delete entries" in {
|
||||
"be able to record and delete entries - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
val entry = "hello".getBytes("UTF-8")
|
||||
|
|
@ -47,7 +48,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null))
|
||||
}
|
||||
|
||||
"be able to record entries and read entries with 'entriesInRange'" in {
|
||||
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
val entry = "hello".getBytes("UTF-8")
|
||||
|
|
@ -63,7 +64,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record entries and read entries with 'entries'" in {
|
||||
"be able to record entries and read entries with 'entries' - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
val entry = "hello".getBytes("UTF-8")
|
||||
|
|
@ -83,7 +84,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record a snapshot" in {
|
||||
"be able to record a snapshot - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
val snapshot = "snapshot".getBytes("UTF-8")
|
||||
|
|
@ -91,7 +92,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog1.close
|
||||
}
|
||||
|
||||
"be able to record and read a snapshot and following entries" in {
|
||||
"be able to record and read a snapshot and following entries - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
val snapshot = "snapshot".getBytes("UTF-8")
|
||||
|
|
@ -117,7 +118,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries" in {
|
||||
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
|
||||
|
||||
|
|
@ -155,7 +156,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog.close
|
||||
}
|
||||
|
||||
"be able to record and delete entries" in {
|
||||
"be able to record and delete entries - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -168,7 +169,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
Thread.sleep(200)
|
||||
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null))
|
||||
}
|
||||
"be able to record entries and read entries with 'entriesInRange'" in {
|
||||
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -190,7 +191,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record entries and read entries with 'entries'" in {
|
||||
"be able to record entries and read entries with 'entries' - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -217,7 +218,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record a snapshot" in {
|
||||
"be able to record a snapshot - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -227,7 +228,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog1.close
|
||||
}
|
||||
|
||||
"be able to record and read a snapshot and following entries" in {
|
||||
"be able to record and read a snapshot and following entries - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
@ -263,7 +264,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
|
|||
txlog2.close
|
||||
}
|
||||
|
||||
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries" in {
|
||||
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in {
|
||||
val uuid = (new UUID).toString
|
||||
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
|
||||
Thread.sleep(200)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
# Define some default values that can be overridden by system properties
|
||||
zookeeper.root.logger=INFO, CONSOLE
|
||||
zookeeper.console.threshold=OFF
|
||||
zookeeper.log.dir=.
|
||||
zookeeper.log.file=zookeeper.log
|
||||
zookeeper.log.threshold=DEBUG
|
||||
zookeeper.tracelog.dir=.
|
||||
zookeeper.tracelog.file=zookeeper_trace.log
|
||||
|
||||
#
|
||||
# ZooKeeper Logging Configuration
|
||||
#
|
||||
|
||||
# Format is "<default threshold> (, <appender>)+
|
||||
|
||||
# DEFAULT: console appender only
|
||||
log4j.rootLogger=${zookeeper.root.logger}
|
||||
|
||||
# Example with rolling log file
|
||||
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
|
||||
|
||||
# Example with rolling log file and tracing
|
||||
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
|
||||
|
||||
#
|
||||
# Log INFO level and above messages to the console
|
||||
#
|
||||
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
|
||||
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
|
||||
|
||||
#
|
||||
# Add ROLLINGFILE to rootLogger to get log file output
|
||||
# Log DEBUG level and above messages to a log file
|
||||
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
|
||||
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
|
||||
|
||||
# Max log file size of 10MB
|
||||
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
|
||||
# uncomment the next line to limit number of backup files
|
||||
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
|
||||
|
||||
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
|
||||
|
||||
|
||||
#
|
||||
# Add TRACEFILE to rootLogger to get log file output
|
||||
# Log DEBUG level and above messages to a log file
|
||||
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
|
||||
log4j.appender.TRACEFILE.Threshold=TRACE
|
||||
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
|
||||
|
||||
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
|
||||
### Notice we are including log4j's NDC here (%x)
|
||||
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="false" debug="false">
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="OFF">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
@ -10,7 +10,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
|
|||
private Ref<Integer> count = new Ref<Integer>(0);
|
||||
|
||||
private void increment() {
|
||||
System.out.println("incrementing");
|
||||
//System.out.println("incrementing");
|
||||
count.set(count.get() + 1);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
package akka.transactor.test;
|
||||
|
||||
public class ExpectedFailureException extends RuntimeException {
|
||||
public ExpectedFailureException() {
|
||||
super("Expected failure");
|
||||
}
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ public class UntypedCoordinatedCounter extends UntypedActor {
|
|||
}
|
||||
|
||||
private void increment() {
|
||||
System.out.println(name + ": incrementing");
|
||||
//System.out.println(name + ": incrementing");
|
||||
count.set(count.get() + 1);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,13 +10,21 @@ import akka.actor.ActorRef;
|
|||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.dispatch.Future;
|
||||
import akka.event.EventHandler;
|
||||
import akka.testkit.EventFilter;
|
||||
import akka.testkit.ErrorFilter;
|
||||
import akka.testkit.TestEvent;
|
||||
import akka.transactor.CoordinatedTransactionException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import scala.Option;
|
||||
import scala.collection.JavaConverters;
|
||||
import scala.collection.Seq;
|
||||
|
||||
public class UntypedCoordinatedIncrementTest {
|
||||
List<ActorRef> counters;
|
||||
|
|
@ -63,6 +71,10 @@ public class UntypedCoordinatedIncrementTest {
|
|||
}
|
||||
|
||||
@Test public void incrementNoCountersWithFailingTransaction() {
|
||||
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
|
||||
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
|
||||
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
|
||||
EventHandler.notify(new TestEvent.Mute(ignoreExceptions));
|
||||
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
|
||||
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
|
||||
actors.add(failer);
|
||||
|
|
@ -83,6 +95,11 @@ public class UntypedCoordinatedIncrementTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
EventHandler.notify(new TestEvent.UnMute(ignoreExceptions));
|
||||
}
|
||||
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ public class UntypedCounter extends UntypedTransactor {
|
|||
}
|
||||
|
||||
private void increment() {
|
||||
System.out.println(name + ": incrementing");
|
||||
//System.out.println(name + ": incrementing");
|
||||
count.set(count.get() + 1);
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +48,7 @@ public class UntypedCounter extends UntypedTransactor {
|
|||
}
|
||||
|
||||
@Override public void before(Object message) {
|
||||
System.out.println(name + ": before transaction");
|
||||
//System.out.println(name + ": before transaction");
|
||||
}
|
||||
|
||||
public void atomically(Object message) {
|
||||
|
|
@ -65,7 +65,7 @@ public class UntypedCounter extends UntypedTransactor {
|
|||
}
|
||||
|
||||
@Override public void after(Object message) {
|
||||
System.out.println(name + ": after transaction");
|
||||
//System.out.println(name + ": after transaction");
|
||||
}
|
||||
|
||||
@Override public boolean normally(Object message) {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,6 @@ import akka.transactor.UntypedTransactor;
|
|||
|
||||
public class UntypedFailer extends UntypedTransactor {
|
||||
public void atomically(Object message) throws Exception {
|
||||
throw new RuntimeException("Expected failure");
|
||||
throw new ExpectedFailureException();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,13 +9,21 @@ import akka.actor.Actors;
|
|||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.dispatch.Future;
|
||||
import akka.event.EventHandler;
|
||||
import akka.testkit.EventFilter;
|
||||
import akka.testkit.ErrorFilter;
|
||||
import akka.testkit.TestEvent;
|
||||
import akka.transactor.CoordinatedTransactionException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import scala.Option;
|
||||
import scala.collection.JavaConverters;
|
||||
import scala.collection.Seq;
|
||||
|
||||
public class UntypedTransactorTest {
|
||||
List<ActorRef> counters;
|
||||
|
|
@ -62,6 +70,10 @@ public class UntypedTransactorTest {
|
|||
}
|
||||
|
||||
@Test public void incrementNoCountersWithFailingTransaction() {
|
||||
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
|
||||
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
|
||||
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
|
||||
EventHandler.notify(new TestEvent.Mute(ignoreExceptions));
|
||||
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
|
||||
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
|
||||
actors.add(failer);
|
||||
|
|
@ -82,6 +94,11 @@ public class UntypedTransactorTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
EventHandler.notify(new TestEvent.UnMute(ignoreExceptions));
|
||||
}
|
||||
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,12 +5,12 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.transactor.Coordinated
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import akka.actor.{ Actor, ActorRef, ActorTimeoutException }
|
||||
import akka.stm.{ Ref, TransactionFactory }
|
||||
import akka.util.duration._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.EventFilter
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.transactor.CoordinatedTransactionException
|
||||
import akka.testkit._
|
||||
|
||||
object CoordinatedIncrement {
|
||||
case class Increment(friends: Seq[ActorRef])
|
||||
|
|
@ -39,13 +39,15 @@ object CoordinatedIncrement {
|
|||
}
|
||||
}
|
||||
|
||||
class ExpectedFailureException extends RuntimeException("Expected failure")
|
||||
|
||||
class Failer extends Actor {
|
||||
val txFactory = TransactionFactory(timeout = 3 seconds)
|
||||
|
||||
def receive = {
|
||||
case coordinated @ Coordinated(Increment(friends)) ⇒ {
|
||||
coordinated.atomic(txFactory) {
|
||||
throw new RuntimeException("Expected failure")
|
||||
throw new ExpectedFailureException
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -55,15 +57,6 @@ object CoordinatedIncrement {
|
|||
class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||
import CoordinatedIncrement._
|
||||
|
||||
override def beforeAll() {
|
||||
EventHandler notify Mute(EventFilter[RuntimeException]("Expected failure"))
|
||||
EventHandler notify Mute(EventFilter[org.multiverse.api.exceptions.DeadTransactionException]())
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
EventHandler notify UnMuteAll
|
||||
}
|
||||
|
||||
val numCounters = 5
|
||||
val timeout = 5 seconds
|
||||
|
||||
|
|
@ -88,6 +81,11 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAnd
|
|||
}
|
||||
|
||||
"increment no counters with a failing transaction" in {
|
||||
val ignoreExceptions = Seq(
|
||||
EventFilter[ExpectedFailureException],
|
||||
EventFilter[CoordinatedTransactionException],
|
||||
EventFilter[ActorTimeoutException])
|
||||
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
|
||||
val (counters, failer) = createActors
|
||||
val coordinated = Coordinated()
|
||||
counters(0) ! Coordinated(Increment(counters.tail :+ failer))
|
||||
|
|
@ -97,6 +95,7 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAnd
|
|||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
EventHandler.notify(TestEvent.UnMute(ignoreExceptions))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,12 +5,12 @@ import org.scalatest.matchers.MustMatchers
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.transactor.Coordinated
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import akka.actor.{ Actor, ActorRef, ActorTimeoutException }
|
||||
import akka.stm._
|
||||
import akka.util.duration._
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.EventFilter
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.transactor.CoordinatedTransactionException
|
||||
import akka.testkit._
|
||||
|
||||
import scala.util.Random.{ nextInt ⇒ random }
|
||||
|
||||
|
|
@ -59,6 +59,8 @@ object FickleFriends {
|
|||
}
|
||||
}
|
||||
|
||||
class ExpectedFailureException(message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* FickleCounter randomly fails at different points with 50% chance of failing overall.
|
||||
*/
|
||||
|
|
@ -72,7 +74,7 @@ object FickleFriends {
|
|||
}
|
||||
|
||||
def failIf(x: Int, y: Int) = {
|
||||
if (x == y) throw new RuntimeException("Random fail at position " + x)
|
||||
if (x == y) throw new ExpectedFailureException("Random fail at position " + x)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
|
|
@ -98,16 +100,6 @@ object FickleFriends {
|
|||
class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||
import FickleFriends._
|
||||
|
||||
val ignoreEvents = List(EventFilter(classOf[RuntimeException], message = "Random fail"))
|
||||
|
||||
override def beforeAll() {
|
||||
ignoreEvents foreach (f ⇒ EventHandler.notify(Mute(f)))
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
ignoreEvents foreach (f ⇒ EventHandler.notify(UnMute(f)))
|
||||
}
|
||||
|
||||
val numCounters = 2
|
||||
|
||||
def createActors = {
|
||||
|
|
@ -119,6 +111,11 @@ class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAl
|
|||
|
||||
"Coordinated fickle friends" should {
|
||||
"eventually succeed to increment all counters by one" in {
|
||||
val ignoreExceptions = Seq(
|
||||
EventFilter[ExpectedFailureException],
|
||||
EventFilter[CoordinatedTransactionException],
|
||||
EventFilter[ActorTimeoutException])
|
||||
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
|
||||
val (counters, coordinator) = createActors
|
||||
val latch = new CountDownLatch(1)
|
||||
coordinator ! FriendlyIncrement(counters, latch)
|
||||
|
|
@ -129,6 +126,7 @@ class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAl
|
|||
}
|
||||
counters foreach (_.stop())
|
||||
coordinator.stop()
|
||||
EventHandler.notify(TestEvent.UnMute(ignoreExceptions))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,12 @@ import org.scalatest.WordSpec
|
|||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.transactor.Transactor
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import akka.actor.{ Actor, ActorRef, ActorTimeoutException }
|
||||
import akka.stm._
|
||||
import akka.util.duration._
|
||||
import akka.event.EventHandler
|
||||
import akka.transactor.CoordinatedTransactionException
|
||||
import akka.testkit._
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
|
|
@ -51,9 +54,11 @@ object TransactorIncrement {
|
|||
}
|
||||
}
|
||||
|
||||
class ExpectedFailureException extends RuntimeException("Expected failure")
|
||||
|
||||
class Failer extends Transactor {
|
||||
def atomically = {
|
||||
case _ ⇒ throw new RuntimeException("Expected failure")
|
||||
case _ ⇒ throw new ExpectedFailureException
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -99,6 +104,11 @@ class TransactorSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"increment no counters with a failing transaction" in {
|
||||
val ignoreExceptions = Seq(
|
||||
EventFilter[ExpectedFailureException],
|
||||
EventFilter[CoordinatedTransactionException],
|
||||
EventFilter[ActorTimeoutException])
|
||||
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
|
||||
val (counters, failer) = createTransactors
|
||||
val failLatch = new CountDownLatch(numCounters + 1)
|
||||
counters(0) ! Increment(counters.tail :+ failer, failLatch)
|
||||
|
|
@ -108,6 +118,7 @@ class TransactorSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
counters foreach (_.stop())
|
||||
failer.stop()
|
||||
EventHandler.notify(TestEvent.UnMute(ignoreExceptions))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -229,6 +229,8 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
|||
val boss = Actor.actorOf(new Actor { def receive = { case _ ⇒ } }).start()
|
||||
val ref = TestActorRef[WorkerActor].start()
|
||||
|
||||
val filter = EventFilter.custom(_ ⇒ true)
|
||||
EventHandler.notify(TestEvent.Mute(filter))
|
||||
val log = TestActorRef[Logger]
|
||||
EventHandler.addListener(log)
|
||||
boss link ref
|
||||
|
|
@ -236,6 +238,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
|
|||
la.count must be(1)
|
||||
la.msg must (include("supervisor") and include("CallingThreadDispatcher"))
|
||||
EventHandler.removeListener(log)
|
||||
EventHandler.notify(TestEvent.UnMute(filter))
|
||||
}
|
||||
|
||||
"proxy apply for the underlying actor" in {
|
||||
|
|
|
|||
|
|
@ -6,4 +6,5 @@ include "akka-reference.conf"
|
|||
|
||||
akka {
|
||||
event-handlers = ["akka.testkit.TestEventListener"]
|
||||
event-handler-level = "WARNING"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ object AkkaBuild extends Build {
|
|||
id = "akka",
|
||||
base = file("."),
|
||||
settings = parentSettings ++ Unidoc.settings ++ rstdocSettings ++ Seq(
|
||||
parallelExecution in GlobalScope := false,
|
||||
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
|
||||
rstdocDirectory <<= baseDirectory / "akka-docs"
|
||||
),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue