diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index a23cc923b4..61bbae14f7 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -6,10 +6,10 @@ package akka.camel import akka.camel.internal._ import akka.util.Timeout -import scala.concurrent.Future import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration +import concurrent.{ ExecutionContext, Future } /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. @@ -27,11 +27,11 @@ trait Activation { * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[ActorRef] = + def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ - case EndpointActivated(`endpoint`) ⇒ endpoint - case EndpointFailedToActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + case EndpointActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause + }) /** * Produces a Future which will be completed when the given endpoint has been deactivated or @@ -40,9 +40,9 @@ trait Activation { * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration): Future[Unit] = - (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ - case EndpointDeActivated(`endpoint`) ⇒ () - case EndpointFailedToDeActivate(_, cause) ⇒ throw cause - })(system.dispatcher) + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = + (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ + case EndpointDeActivated(`endpoint`) ⇒ endpoint + case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause + }) } \ No newline at end of file diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index 5531cda109..c69c2f55bf 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -40,7 +40,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ */ private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) - Await.result(activationFutureFor(consumer.self)(activationTimeout), activationTimeout) + Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout) } } diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index 8bb72baf00..b30f541bb1 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -7,13 +7,15 @@ package akka.camel; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.testkit.JavaTestKit; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.junit.AfterClass; import org.junit.Test; - -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import akka.testkit.AkkaSpec; +import akka.testkit.JavaTestKit.EventFilter; import static org.junit.Assert.assertEquals; @@ -23,9 +25,7 @@ import static org.junit.Assert.assertEquals; */ public class ConsumerJavaTestBase { - static ActorSystem system = ActorSystem.create("test"); - static Camel camel = CamelExtension.get(system); - + static ActorSystem system = ActorSystem.create("test", AkkaSpec.testConf()); @AfterClass public static void tearDownAfterClass() { @@ -34,12 +34,24 @@ public class ConsumerJavaTestBase { @Test public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception { - Duration timeout = Duration.create(1, TimeUnit.SECONDS); - ActorRef ref = Await.result( - camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout), - timeout); - - String result = camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); - assertEquals("error: hello", result); + new JavaTestKit(system) {{ + String result = new EventFilter(Exception.class) { + protected String run() { + Duration timeout = Duration.create(1, TimeUnit.SECONDS); + Camel camel = CamelExtension.get(system); + ExecutionContext executionContext = system.dispatcher(); + try { + ActorRef ref = Await.result( + camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext), + timeout); + return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class); + } + catch (Exception e) { + return e.getMessage(); + } + } + }.occurrences(1).exec(); + assertEquals("error: hello", result); + }}; } } diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index df668b72a1..bb7bf4042f 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -5,6 +5,7 @@ import akka.camel.internal.component.CamelPath; import akka.camel.javaapi.UntypedConsumerActor; import akka.camel.javaapi.UntypedProducerActor; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; @@ -59,8 +60,9 @@ public class CustomRouteTestBase { public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( - camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout), + camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testRouteConsumer",consumer)); camel.template().sendBody("direct:testRouteConsumer", "test"); @@ -72,11 +74,12 @@ public class CustomRouteTestBase { public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor( system.actorOf( new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAck","mock:mockAck"); } }), "testConsumerAck"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAck", consumer, false, timeout)); camel.template().sendBody("direct:testAck", "test"); @@ -87,10 +90,11 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); + ExecutionContext executionContext = system.dispatcher(); Duration timeout = Duration.create(10, TimeUnit.SECONDS); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testAckFromUri","akka://test/user/testConsumerAckUri?autoAck=false")); camel.template().sendBody("direct:testAckFromUri", "test"); @@ -101,9 +105,10 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { Duration timeout = Duration.create(10, TimeUnit.SECONDS); + ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), - timeout), + timeout, executionContext), timeout); camel.context().addRoutes(new CustomRouteBuilder("direct:testException", consumer, false, Duration.create(0, TimeUnit.SECONDS))); camel.template().sendBody("direct:testException", "test"); diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index 9d3ea5ef69..fe79bacdd8 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeoutException class ActivationIntegrationTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 10 seconds def template: ProducerTemplate = camel.template + import system.dispatcher "ActivationAware must be notified when endpoint is activated" in { val latch = new TestLatch(0) diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index 08ee3cf99d..6ffacf3432 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -15,18 +15,23 @@ import org.apache.camel.model.RouteDefinition import org.apache.camel.builder.Builder import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException } import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } -import akka.testkit.TestLatch import akka.actor.Status.Failure -import scala.concurrent.Await import scala.concurrent.util.duration._ +import concurrent.{ ExecutionContext, Await } +import akka.testkit._ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem { "ConsumerIntegrationTest" must { implicit val defaultTimeout = 10.seconds + implicit def ec: ExecutionContext = system.dispatcher "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in { - val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) - intercept[FailedToCreateRouteException] { Await.result(camel.activationFutureFor(actorRef), defaultTimeout) } + filterEvents(EventFilter[ActorInitializationException](occurrences = 1), EventFilter[FailedToCreateRouteException](occurrences = 1)) { + val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri"))) + intercept[FailedToCreateRouteException] { + Await.result(camel.activationFutureFor(actorRef), defaultTimeout) + } + } } "Consumer must support in-out messaging" in { @@ -62,7 +67,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC def endpointUri = "direct:a2" def receive = { - case "throw" ⇒ throw new Exception + case "throw" ⇒ throw new TestException("") case m: CamelMessage ⇒ sender ! "received " + m.bodyAs[String] } @@ -70,11 +75,12 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC restarted.countDown() } }) - consumer ! "throw" - Await.ready(restarted, defaultTimeout) + filterEvents(EventFilter[TestException](occurrences = 1)) { + consumer ! "throw" + Await.ready(restarted, defaultTimeout) - val response = camel.sendTo("direct:a2", msg = "xyz") - response must be("received xyz") + camel.sendTo("direct:a2", msg = "xyz") must be("received xyz") + } } "Consumer must unregister itself when stopped" in { @@ -103,19 +109,23 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC "Error passing consumer supports error handling through route modification" in { start(new ErrorThrowingConsumer("direct:error-handler-test") with ErrorPassing { override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end } }) - camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") + filterEvents(EventFilter[TestException](occurrences = 1)) { + camel.sendTo("direct:error-handler-test", msg = "hello") must be("error: hello") + } } "Error passing consumer supports redelivery through route modification" in { start(new FailingOnceConsumer("direct:failing-once-concumer") with ErrorPassing { override def onRouteDefinition(rd: RouteDefinition) = { - rd.onException(classOf[Exception]).maximumRedeliveries(1).end + rd.onException(classOf[TestException]).maximumRedeliveries(1).end } }) - camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") + filterEvents(EventFilter[TestException](occurrences = 1)) { + camel.sendTo("direct:failing-once-concumer", msg = "hello") must be("accepted: hello") + } } "Consumer supports manual Ack" in { @@ -154,7 +164,7 @@ class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedC class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer { def receive = { - case msg: CamelMessage ⇒ throw new Exception("error: %s" format msg.body) + case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body) } } @@ -165,7 +175,7 @@ class FailingOnceConsumer(override val endpointUri: String) extends Consumer { if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false)) sender ! ("accepted: %s" format msg.body) else - throw new Exception("rejected: %s" format msg.body) + throw new TestException("rejected: %s" format msg.body) } } @@ -184,3 +194,5 @@ trait ErrorPassing { trait ManualAckConsumer extends Consumer { override def autoAck = false } + +class TestException(msg: String) extends Exception(msg) diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 3de8055875..e9d5382843 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -18,7 +18,7 @@ import akka.pattern._ import scala.concurrent.util.duration._ import akka.util.Timeout import org.scalatest.matchers.MustMatchers -import akka.testkit.TestLatch +import akka.testkit._ /** * Tests the features of the Camel Producer. @@ -70,14 +70,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd })) val producer = Await.result[ActorRef](supervisor.ask(Props(new TestProducer("direct:producer-test-2"))).mapTo[ActorRef], timeoutDuration) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } Await.ready(latch, timeoutDuration) deadActor must be(Some(producer)) @@ -117,14 +113,11 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message to direct:producer-test-3 and receive failure response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3"))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the producer - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) } } @@ -148,13 +141,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - // a failure response must have been returned by the forward target - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } @@ -169,10 +159,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target))) - mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) - producer.tell(CamelMessage("fail", Map()), producer) - mockEndpoint.assertIsSatisfied() + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + mockEndpoint.expectedMessageCount(1) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) + producer.tell(CamelMessage("fail", Map()), producer) + mockEndpoint.assertIsSatisfied() + } } "produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { @@ -194,12 +186,10 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeoutDuration).failed - Await.ready(future, timeoutDuration).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val e = intercept[AkkaCamelException] { Await.result(producer.ask(message)(timeoutDuration), timeoutDuration) } + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } @@ -214,10 +204,12 @@ class ProducerFeatureTest extends WordSpec with BeforeAndAfterAll with BeforeAnd "produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget]) val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target))) - mockEndpoint.expectedMessageCount(1) - mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) - producer.tell(CamelMessage("fail", Map()), producer) - mockEndpoint.assertIsSatisfied() + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + mockEndpoint.expectedMessageCount(1) + mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) + producer.tell(CamelMessage("fail", Map()), producer) + mockEndpoint.assertIsSatisfied() + } } } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala index 5fcf409937..4d46544b99 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala @@ -16,6 +16,8 @@ import scala.concurrent.Await class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSystem { implicit val timeout = 5.seconds + implicit val ec = system.dispatcher + "A ProducerRegistry" must { def newEmptyActor: ActorRef = system.actorOf(Props.empty) def registerProcessorFor(actorRef: ActorRef) = camel.registerProducer(actorRef, "mock:mock")._2 diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 2920e558a0..fe22b0e7a0 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -16,12 +16,13 @@ import scala.reflect.ClassTag import akka.actor.{ ActorRef, Props, ActorSystem, Actor } import concurrent.Await import akka.util.Timeout +import akka.testkit.AkkaSpec private[camel] object TestSupport { def start(actor: ⇒ Actor)(implicit system: ActorSystem): ActorRef = { val actorRef = system.actorOf(Props(actor)) - Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds), 10 seconds) + Await.result(CamelExtension(system).activationFutureFor(actorRef)(10 seconds, system.dispatcher), 10 seconds) actorRef } @@ -47,7 +48,7 @@ private[camel] object TestSupport { } trait SharedCamelSystem extends BeforeAndAfterAll { this: Suite ⇒ - implicit lazy val system = ActorSystem("test") + implicit lazy val system = ActorSystem("test", AkkaSpec.testConf) implicit lazy val camel = CamelExtension(system) abstract override protected def afterAll() { @@ -62,7 +63,7 @@ private[camel] object TestSupport { override protected def beforeEach() { super.beforeEach() - system = ActorSystem("test") + system = ActorSystem("test", AkkaSpec.testConf) camel = CamelExtension(system) } diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala index 21e9800b87..11178277b9 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala @@ -16,6 +16,7 @@ import akka.pattern._ import scala.concurrent.Await import scala.concurrent.util.duration._ import org.scalatest._ +import akka.testkit._ import matchers.MustMatchers class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with SharedCamelSystem with GivenWhenThen { @@ -49,13 +50,15 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter val producer = system.actorOf(Props[SampleUntypedReplyingProducer]) val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) - val future = producer.ask(message)(timeout).failed + filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { + val future = producer.ask(message)(timeout).failed - Await.ready(future, timeout).value match { - case Some(Right(e: AkkaCamelException)) ⇒ - e.getMessage must be("failure") - e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) - case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + Await.ready(future, timeout).value match { + case Some(Right(e: AkkaCamelException)) ⇒ + e.getMessage must be("failure") + e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) + case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) + } } } } @@ -67,7 +70,6 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map[String, Any]()), producer) - mockEndpoint.assertIsSatisfied } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5b3b977d90..f3a6c1a730 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -6,12 +6,10 @@ package akka.cluster import scala.collection.immutable.SortedSet import scala.concurrent.util.{ Deadline, Duration } import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler } +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler } import akka.actor.Status.Failure import akka.event.EventStream -import akka.routing.ScatterGatherFirstCompletedRouter import akka.util.Timeout -import akka.pattern.{ AskTimeoutException, ask, pipe } import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import language.existentials @@ -75,15 +73,20 @@ private[cluster] object InternalClusterAction { */ case class InitJoinAck(address: Address) extends ClusterMessage - case object GossipTick + /** + * Marker interface for periodic tick messages + */ + sealed trait Tick - case object HeartbeatTick + case object GossipTick extends Tick - case object ReapUnreachableTick + case object HeartbeatTick extends Tick - case object LeaderActionsTick + case object ReapUnreachableTick extends Tick - case object PublishStatsTick + case object LeaderActionsTick extends Tick + + case object PublishStatsTick extends Tick case class SendClusterMessage(to: Address, msg: ClusterMessage) @@ -215,7 +218,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) }) override def preStart(): Unit = { - if (AutoJoin) self ! InternalClusterAction.JoinSeedNode + if (AutoJoin) { + // only the node which is named first in the list of seed nodes will join itself + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(environment)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + } } override def postStop(): Unit = { @@ -226,7 +236,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) publishStateTask foreach { _.cancel() } } - def receive = { + def uninitialized: Actor.Receive = { + case InitJoin ⇒ // skip, not ready yet + case JoinTo(address) ⇒ join(address) + case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case _: Tick ⇒ // ignore periodic tasks until initialized + } + + def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() @@ -234,10 +252,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() - case JoinSeedNode ⇒ joinSeedNode() case InitJoin ⇒ initJoin() - case InitJoinAck(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() case JoinTo(address) ⇒ join(address) case ClusterUserAction.Join(address) ⇒ joining(address) case ClusterUserAction.Down(address) ⇒ downing(address) @@ -251,21 +266,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } - def joinSeedNode(): Unit = { - val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } - if (seedRoutees.isEmpty) join(selfAddress) - else { - implicit val within = Timeout(SeedNodeTimeout) - val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration))) - seedRouter ! InitJoin - seedRouter ! PoisonPill - } - } + def receive = uninitialized def initJoin(): Unit = sender ! InitJoinAck(selfAddress) - def joinSeedNodeTimeout(): Unit = join(selfAddress) - /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -281,7 +285,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) publish(localGossip) - coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + context.become(initialized) + if (address == selfAddress) + joining(address) + else + coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) } /** @@ -870,6 +878,53 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def ping(p: Ping): Unit = sender ! Pong(p) } +/** + * INTERNAL API. + * + * Sends InitJoinAck to all seed nodes (except itself) and expect + * InitJoinAck reply back. The seed node that replied first + * will be used, joined to. InitJoinAck replies received after the + * first one are ignored. + * + * Retries if no InitJoinAck replies are received within the + * SeedNodeTimeout. + * When at least one reply has been received it stops itself after + * an idle SeedNodeTimeout. + * + */ +private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { + import InternalClusterAction._ + + def selfAddress = environment.selfAddress + + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + throw new IllegalArgumentException("Join seed node should not be done") + + context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + + override def preStart(): Unit = self ! JoinSeedNode + + def receive = { + case JoinSeedNode ⇒ + // send InitJoin to all seed nodes (except myself) + environment.seedNodes.collect { + case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) + } foreach { _ ! InitJoin } + case InitJoinAck(address) ⇒ + // first InitJoinAck reply + context.parent ! JoinTo(address) + context.become(done) + case ReceiveTimeout ⇒ + // no InitJoinAck received, try again + self ! JoinSeedNode + } + + def done: Actor.Receive = { + case InitJoinAck(_) ⇒ // already received one, skip rest + case ReceiveTimeout ⇒ context.stop(self) + } +} + /** * INTERNAL API. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index f71ebe3cc3..10d98cd86b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -13,6 +13,7 @@ import scala.concurrent.util.duration._ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") val seed2 = role("seed2") + val seed3 = role("seed3") val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") @@ -25,6 +26,7 @@ class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPup class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -32,37 +34,24 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2) + override def seedNodes = IndexedSeq(seed1, seed2, seed3) "A cluster with configured seed nodes" must { - "start the seed nodes sequentially" taggedAs LongRunningTest in { - // without looking up the addresses first there might be - // [akka://JoinSeedNodeSpec/user/TestConductorClient] cannot write GetAddress(RoleName(seed2)) while waiting for seed1 - roles foreach address + "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { runOn(seed1) { - startClusterNode() + // test that first seed doesn't have to be started first + Thread.sleep(3000) } - enterBarrier("seed1-started") - runOn(seed2) { - startClusterNode() - } - enterBarrier("seed2-started") - - runOn(seed1, seed2) { - awaitUpConvergence(2) + runOn(seed1, seed2, seed3) { + awaitUpConvergence(3) } enterBarrier("after-1") } "join the seed nodes at startup" taggedAs LongRunningTest in { - - startClusterNode() - enterBarrier("all-started") - - awaitUpConvergence(4) - + awaitUpConvergence(roles.size) enterBarrier("after-2") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 73837ef697..93c1f921ae 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -22,7 +22,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-join = off + auto-join = on auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms diff --git a/akka-docs/dev/developer-guidelines.rst b/akka-docs/dev/developer-guidelines.rst index 59f5a85349..a43d1b48e7 100644 --- a/akka-docs/dev/developer-guidelines.rst +++ b/akka-docs/dev/developer-guidelines.rst @@ -13,7 +13,7 @@ Akka is using ``Scalariform`` to format the source code as part of the build. So Process ------- -* Make sure you have signed the Akka CLA, if not, ask for it on the Mailing List. +* Make sure you have signed the Akka CLA, if not, `sign it online `_. * Pick a ticket, if there is no ticket for your work then create one first. * Start working in a feature branch. Name it something like ``wip---``. * When you are done, create a GitHub Pull-Request towards the targeted branch and email the Akka Mailing List that you want it reviewed diff --git a/akka-docs/java/code/docs/camel/ActivationTestBase.java b/akka-docs/java/code/docs/camel/ActivationTestBase.java index d53b66a20b..d75b59b02b 100644 --- a/akka-docs/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/java/code/docs/camel/ActivationTestBase.java @@ -29,13 +29,13 @@ public class ActivationTestBase { Camel camel = CamelExtension.get(system); // get a future reference to the activation of the endpoint of the Consumer Actor FiniteDuration duration = Duration.create(10, SECONDS); - Future activationFuture = camel.activationFutureFor(producer, duration); + Future activationFuture = camel.activationFutureFor(producer, duration, system.dispatcher()); //#CamelActivation //#CamelDeactivation // .. system.stop(producer); // get a future reference to the deactivation of the endpoint of the Consumer Actor - Future deactivationFuture = camel.activationFutureFor(producer, duration); + Future deactivationFuture = camel.deactivationFutureFor(producer, duration, system.dispatcher()); //#CamelDeactivation system.shutdown(); } diff --git a/akka-docs/java/logging.rst b/akka-docs/java/logging.rst index 647525ba76..51a764f74f 100644 --- a/akka-docs/java/logging.rst +++ b/akka-docs/java/logging.rst @@ -146,7 +146,7 @@ If you want to see all messages that are received through remoting at DEBUG log } } -Also see the logging options for TestKit: :ref:`actor.logging`. +Also see the logging options for TestKit: :ref:`actor.logging-java`. Event Handler diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 82a736973f..4aa62a63be 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -43,6 +43,11 @@ As you can see in the example above there are four things you need to add to get communicate across the network. * Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically +.. note:: + The port number needs to be unique for each actor system on the same machine even if the actor + systems have different names. This is because each actor system has its own network subsystem + listening for connections and handling messages as not to interfere with other actor systems. + The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information: diff --git a/akka-docs/project/licenses.rst b/akka-docs/project/licenses.rst index 7dbcf5ef9f..4887412e8e 100644 --- a/akka-docs/project/licenses.rst +++ b/akka-docs/project/licenses.rst @@ -29,168 +29,8 @@ Akka License Akka Committer License Agreement -------------------------------- -All committers have signed this CLA - -:: - - Based on: http://www.apache.org/licenses/icla.txt - - Typesafe Inc. - Individual Contributor License Agreement ("Agreement") V2.0 - http://www.scalablesolutions.se/licenses/ - - Thank you for your interest in Akka, a Typesafe Inc. (the - "Company") Open Source project. In order to clarify the intellectual - property license granted with Contributions from any person or entity, - the Company must have a Contributor License Agreement ("CLA") on file - that has been signed by each Contributor, indicating agreement to the - license terms below. This license is for your protection as a - Contributor as well as the protection of the Company and its users; - it does not change your rights to use your own Contributions for any - other purpose. - - Full name: ______________________________________________________ - - Mailing Address: ________________________________________________ - - _________________________________________________________________ - - _________________________________________________________________ - - Country: ______________________________________________________ - - Telephone: ______________________________________________________ - - Facsimile: ______________________________________________________ - - E-Mail: ______________________________________________________ - - You accept and agree to the following terms and conditions for Your - present and future Contributions submitted to the Company. In - return, the Company shall not use Your Contributions in a way that - is contrary to the public benefit or inconsistent with its nonprofit - status and bylaws in effect at the time of the Contribution. Except - for the license granted herein to the Company and recipients of - software distributed by the Company, You reserve all right, title, - and interest in and to Your Contributions. - - 1. Definitions. - - "You" (or "Your") shall mean the copyright owner or legal entity - authorized by the copyright owner that is making this Agreement - with the Company. For legal entities, the entity making a - Contribution and all other entities that control, are controlled - by, or are under common control with that entity are considered to - be a single Contributor. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "Contribution" shall mean any original work of authorship, - including any modifications or additions to an existing work, that - is intentionally submitted by You to the Company for inclusion - in, or documentation of, any of the products owned or managed by - the Company (the "Work"). For the purposes of this definition, - "submitted" means any form of electronic, verbal, or written - communication sent to the Company or its representatives, - including but not limited to communication on electronic mailing - lists, source code control systems, and issue tracking systems that - are managed by, or on behalf of, the Company for the purpose of - discussing and improving the Work, but excluding communication that - is conspicuously marked or otherwise designated in writing by You - as "Not a Contribution." - - 2. Grant of Copyright License. Subject to the terms and conditions of - this Agreement, You hereby grant to the Company and to - recipients of software distributed by the Company a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare derivative works of, - publicly display, publicly perform, sublicense, and distribute Your - Contributions and such derivative works. - - 3. Grant of Patent License. Subject to the terms and conditions of - this Agreement, You hereby grant to the Company and to - recipients of software distributed by the Company a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have - made, use, offer to sell, sell, import, and otherwise transfer the - Work, where such license applies only to those patent claims - licensable by You that are necessarily infringed by Your - Contribution(s) alone or by combination of Your Contribution(s) - with the Work to which such Contribution(s) was submitted. If any - entity institutes patent litigation against You or any other entity - (including a cross-claim or counterclaim in a lawsuit) alleging - that your Contribution, or the Work to which you have contributed, - constitutes direct or contributory patent infringement, then any - patent licenses granted to that entity under this Agreement for - that Contribution or Work shall terminate as of the date such - litigation is filed. - - 4. You agree that all Contributions are and will be given entirely - voluntarily. Company will not be required to use, or to refrain - from using, any Contributions that You, will not, absent a - separate written agreement signed by Company, create any - confidentiality obligation of Company, and Company has not - undertaken any obligation to treat any Contributions or other - information You have given Company or will give Company in the - future as confidential or proprietary information. Furthermore, - except as otherwise provided in a separate subsequence written - agreement between You and Company, Company will be free to use, - disclose, reproduce, license or otherwise distribute, and exploit - the Contributions as it sees fit, entirely without obligation or - restriction of any kind on account of any proprietary or - intellectual property rights or otherwise. - - 5. You represent that you are legally entitled to grant the above - license. If your employer(s) has rights to intellectual property - that you create that includes your Contributions, you represent - that you have received permission to make Contributions on behalf - of that employer, that your employer has waived such rights for - your Contributions to the Company, or that your employer has - executed a separate Corporate CLA with the Company. - - 6. You represent that each of Your Contributions is Your original - creation (see section 7 for submissions on behalf of others). You - represent that Your Contribution submissions include complete - details of any third-party license or other restriction (including, - but not limited to, related patents and trademarks) of which you - are personally aware and which are associated with any part of Your - Contributions. - - 7. You are not expected to provide support for Your Contributions, - except to the extent You desire to provide support. You may provide - support for free, for a fee, or not at all. Unless required by - applicable law or agreed to in writing, You provide Your - Contributions on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied, including, without - limitation, any warranties or conditions of TITLE, NON- - INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. - - 8. Should You wish to submit work that is not Your original creation, - You may submit it to the Company separately from any - Contribution, identifying the complete details of its source and of - any license or other restriction (including, but not limited to, - related patents, trademarks, and license agreements) of which you - are personally aware, and conspicuously marking the work as - "Submitted on behalf of a third-party: [named here]". - - 9. You agree to notify the Company of any facts or circumstances of - which you become aware that would make these representations - inaccurate in any respect. - - 9. The validity of the interpretation of this Agreements shall be - governed by, and constructed and enforced in accordance with, the - laws of Sweden, applicable to the agreements made there (excluding - the conflict of law rules). This Agreement embodies the entire - agreement and understanding of the parties hereto and supersedes - any and all prior agreements, arrangements and understandings - relating to the matters provided for herein. No alteration, waiver, - amendment changed or supplement hereto shall be binding more - effective unless the same as set forth in writing signed by both - parties. - - Please sign: __________________________________ Date: ________________ +All committers have signed this `CLA `_. +It can be `signed online `_. Licenses for Dependency Libraries --------------------------------- diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index aa9546defb..6abb8c33bc 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -92,12 +92,12 @@ object Introduction { val camel = CamelExtension(system) val actorRef = system.actorOf(Props[MyEndpoint]) // get a future reference to the activation of the endpoint of the Consumer Actor - val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds) + val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelActivation //#CamelDeactivation system.stop(actorRef) // get a future reference to the deactivation of the endpoint of the Consumer Actor - val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds) + val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher) //#CamelDeactivation } diff --git a/akka-docs/scala/io.rst b/akka-docs/scala/io.rst index ba12324aa5..2712ed5cd1 100644 --- a/akka-docs/scala/io.rst +++ b/akka-docs/scala/io.rst @@ -58,7 +58,7 @@ After extracting data from a ``ByteIterator``, the remaining content can also be .. includecode:: code/akka/docs/io/BinaryCoding.scala :include: rest-to-seq -with no copying from bytes to rest involved. In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ``ByteString``s and (at worst) O(nChunks) for chunked ByteStrings. +with no copying from bytes to rest involved. In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings. Encoding of data also is very natural, using ``ByteStringBuilder`` diff --git a/akka-docs/scala/logging.rst b/akka-docs/scala/logging.rst index 8f765b4f7e..3379a8f585 100644 --- a/akka-docs/scala/logging.rst +++ b/akka-docs/scala/logging.rst @@ -165,7 +165,7 @@ If you want to see all messages that are received through remoting at DEBUG log } } -Also see the logging options for TestKit: :ref:`actor.logging`. +Also see the logging options for TestKit: :ref:`actor.logging-scala`. Translating Log Source to String and Class ------------------------------------------ diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index ab49765fad..a165272ddb 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -40,6 +40,11 @@ As you can see in the example above there are four things you need to add to get communicate across the network. * Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically +.. note:: + The port number needs to be unique for each actor system on the same machine even if the actor + systems have different names. This is because each actor system has its own network subsystem + listening for connections and handling messages as not to interfere with other actor systems. + The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information: diff --git a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala index 03338c6b24..2728a80894 100644 --- a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala +++ b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala @@ -38,17 +38,21 @@ class SimpleNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT "simple.xml" must { "set up ActorSystem when bundle starts" in { - serviceForType[ActorSystem] must not be (null) + filterErrors() { + serviceForType[ActorSystem] must not be (null) + } } "stop the ActorSystem when bundle stops" in { - val system = serviceForType[ActorSystem] - system.isTerminated must be(false) + filterErrors() { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) - bundleForName(TEST_BUNDLE_NAME).stop() + bundleForName(TEST_BUNDLE_NAME).stop() - system.awaitTermination() - system.isTerminated must be(true) + system.awaitTermination() + system.isTerminated must be(true) + } } } @@ -64,19 +68,23 @@ class ConfigNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT "config.xml" must { "set up ActorSystem when bundle starts" in { - val system = serviceForType[ActorSystem] - system must not be (null) - system.settings.config.getString("some.config.key") must be("value") + filterErrors() { + val system = serviceForType[ActorSystem] + system must not be (null) + system.settings.config.getString("some.config.key") must be("value") + } } "stop the ActorSystem when bundle stops" in { - val system = serviceForType[ActorSystem] - system.isTerminated must be(false) + filterErrors() { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) - bundleForName(TEST_BUNDLE_NAME).stop() + bundleForName(TEST_BUNDLE_NAME).stop() - system.awaitTermination() - system.isTerminated must be(true) + system.awaitTermination() + system.isTerminated must be(true) + } } } @@ -93,9 +101,11 @@ class DependencyInjectionNamespaceHandlerTest extends WordSpec with MustMatchers "injection.xml" must { "set up bean containing ActorSystem" in { - val bean = serviceForType[ActorSystemAwareBean] - bean must not be (null) - bean.system must not be (null) + filterErrors() { + val bean = serviceForType[ActorSystemAwareBean] + bean must not be (null) + bean.system must not be (null) + } } } diff --git a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala index e1781b4a80..80bac1529f 100644 --- a/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala +++ b/akka-osgi/src/test/scala/akka/osgi/ActorSystemActivatorTest.scala @@ -38,21 +38,24 @@ class PingPongActorSystemActivatorTest extends WordSpec with MustMatchers with P "PingPongActorSystemActivator" must { "start and register the ActorSystem when bundle starts" in { - val system = serviceForType[ActorSystem] - val actor = system.actorFor("/user/pong") + filterErrors() { + val system = serviceForType[ActorSystem] + val actor = system.actorFor("/user/pong") - implicit val timeout = Timeout(5 seconds) - Await.result(actor ? Ping, timeout.duration) must be(Pong) + implicit val timeout = Timeout(5 seconds) + Await.result(actor ? Ping, timeout.duration) must be(Pong) + } } "stop the ActorSystem when bundle stops" in { - val system = serviceForType[ActorSystem] - system.isTerminated must be(false) + filterErrors() { + val system = serviceForType[ActorSystem] + system.isTerminated must be(false) - bundleForName(TEST_BUNDLE_NAME).stop() - - system.awaitTermination() - system.isTerminated must be(true) + bundleForName(TEST_BUNDLE_NAME).stop() + system.awaitTermination() + system.isTerminated must be(true) + } } } @@ -67,7 +70,9 @@ class RuntimeNameActorSystemActivatorTest extends WordSpec with MustMatchers wit "RuntimeNameActorSystemActivator" must { "register an ActorSystem and add the bundle id to the system name" in { - serviceForType[ActorSystem].name must equal(TestActivators.ACTOR_SYSTEM_NAME_PATTERN.format(bundleForName(TEST_BUNDLE_NAME).getBundleId)) + filterErrors() { + serviceForType[ActorSystem].name must equal(TestActivators.ACTOR_SYSTEM_NAME_PATTERN.format(bundleForName(TEST_BUNDLE_NAME).getBundleId)) + } } } diff --git a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala index 9e1b1012db..e993d04f01 100644 --- a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala +++ b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala @@ -13,10 +13,11 @@ import org.osgi.framework._ import java.net.URL import java.util.jar.JarInputStream -import java.io.{ FileInputStream, FileOutputStream, File } +import java.io._ import org.scalatest.{ BeforeAndAfterAll, Suite } import java.util.{ UUID, Date, ServiceLoader, HashMap } import scala.reflect.ClassTag +import scala.Some /** * Trait that provides support for building akka-osgi tests using PojoSR @@ -32,6 +33,8 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { */ def testBundles: Seq[BundleDescriptor] + val bufferedLoadingErrors = new ByteArrayOutputStream() + lazy val context: BundleContext = { val config = new HashMap[String, AnyRef]() System.setProperty("org.osgi.framework.storage", "target/akka-osgi/" + UUID.randomUUID().toString) @@ -40,7 +43,15 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { bundles.addAll(testBundles) config.put(PojoServiceRegistryFactory.BUNDLE_DESCRIPTORS, bundles) - ServiceLoader.load(classOf[PojoServiceRegistryFactory]).iterator.next.newPojoServiceRegistry(config).getBundleContext + val oldErr = System.err + System.setErr(new PrintStream(bufferedLoadingErrors)) + try { + ServiceLoader.load(classOf[PojoServiceRegistryFactory]).iterator.next.newPojoServiceRegistry(config).getBundleContext + } catch { + case e: Throwable ⇒ oldErr.write(bufferedLoadingErrors.toByteArray); throw e + } finally { + System.setErr(oldErr) + } } // Ensure bundles get stopped at the end of the test to release resources and stop threads @@ -72,6 +83,14 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { } protected def buildTestBundles(builders: Seq[BundleDescriptorBuilder]): Seq[BundleDescriptor] = builders map (_.build) + + def filterErrors()(block: ⇒ Unit): Unit = { + try { + block + } catch { + case e: Throwable ⇒ System.err.write(bufferedLoadingErrors.toByteArray); throw e + } + } } object PojoSRTestSupport { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 7ae3219c5d..3064f05ed2 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -107,6 +107,7 @@ akka { # (I) The default remote server port clients should connect to. # Default is 2552 (AKKA), use 0 if you want a random available port + # This port needs to be unique for each actor system on the same machine. port = 2552 # (O) The address of a local network interface (IP Address) to bind to when creating diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 8d62f30923..47f58bdce6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -502,6 +502,50 @@ object AkkaBuild extends Build { def akkaPreviousArtifact(id: String, organization: String = "com.typesafe.akka", version: String = "2.0"): Option[sbt.ModuleID] = if (enableMiMa) Some(organization % id % version) // the artifact to compare binary compatibility with else None + + // OSGi settings + + object OSGi { + + val actor = exports(Seq("akka*")) + + val agent = exports(Seq("akka.agent.*")) + + val camel = exports(Seq("akka.camel.*")) + + val cluster = exports(Seq("akka.cluster.*")) + + val fileMailbox = exports(Seq("akka.actor.mailbox.*")) + + val mailboxesCommon = exports(Seq("akka.actor.mailbox.*")) + + val osgi = exports(Seq("akka.osgi")) ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.impl")) + + val osgiAries = exports() ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.aries.*")) + + val remote = exports(Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*")) + + val slf4j = exports(Seq("akka.event.slf4j.*")) + + val dataflow = exports(Seq("akka.dataflow.*")) + + val transactor = exports(Seq("akka.transactor.*")) + + val zeroMQ = exports(Seq("akka.zeromq.*")) + + def exports(packages: Seq[String] = Seq()) = osgiSettings ++ Seq( + OsgiKeys.importPackage := defaultImports, + OsgiKeys.exportPackage := packages, + packagedArtifact in (Compile, packageBin) <<= (artifact in (Compile, packageBin), OsgiKeys.bundle).identityMap, + artifact in (Compile, packageBin) ~= (_.copy(`type` = "bundle")) + ) + + def defaultImports = Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*") + def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName) + def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4.1,0.5)\"".format(packageName) + def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.10,2.11)\"".format(packageName) + + } } // Dependencies @@ -583,45 +627,3 @@ object Dependency { } } - -// OSGi settings - -object OSGi { - - val actor = exports(Seq("akka*")) - - val agent = exports(Seq("akka.agent.*")) - - val camel = exports(Seq("akka.camel.*")) - - val cluster = exports(Seq("akka.cluster.*")) - - val fileMailbox = exports(Seq("akka.actor.mailbox.*")) - - val mailboxesCommon = exports(Seq("akka.actor.mailbox.*")) - - val osgi = exports(Seq("akka.osgi")) ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.impl")) - - val osgiAries = exports() ++ Seq(OsgiKeys.privatePackage := Seq("akka.osgi.aries.*")) - - val remote = exports(Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*")) - - val slf4j = exports(Seq("akka.event.slf4j.*")) - - val dataflow = exports(Seq("akka.dataflow.*")) - - val transactor = exports(Seq("akka.transactor.*")) - - val zeroMQ = exports(Seq("akka.zeromq.*")) - - def exports(packages: Seq[String] = Seq()) = osgiSettings ++ Seq( - OsgiKeys.importPackage := defaultImports, - OsgiKeys.exportPackage := packages - ) - - def defaultImports = Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*") - def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName) - def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4.1,0.5)\"".format(packageName) - def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.10,2.11)\"".format(packageName) - -}