From 769786d544fe20d0a1342f41facd25d9db42a821 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 14 Oct 2010 14:22:08 +0200 Subject: [PATCH 01/11] Upgrade to Camel 2.5-SNAPSHOT, Jetty 7.1.6.v20100715 and ActiveMQ 5.4.1 --- project/build/AkkaProject.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 87bbf0f3e9..dc87078172 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -88,12 +88,17 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! + // ------------------------------------- + // TEMPORARY + // ------------------------------------- + lazy val apacheSnapshots = MavenRepository("apache-snapshots", "https://repository.apache.org/content/repositories/snapshots/") + // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- lazy val ATMO_VERSION = "0.6.2" - lazy val CAMEL_VERSION = "2.4.0" + lazy val CAMEL_VERSION = "2.5-SNAPSHOT" lazy val CASSANDRA_VERSION = "0.6.1" lazy val DISPATCH_VERSION = "0.7.4" lazy val HAWT_DISPATCH_VERSION = "1.0" @@ -103,9 +108,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SCALATEST_VERSION = "1.2" lazy val LOGBACK_VERSION = "0.9.24" lazy val SLF4J_VERSION = "1.6.0" - lazy val SPRING_VERSION = "3.0.3.RELEASE" + lazy val SPRING_VERSION = "3.0.4.RELEASE" lazy val ASPECTWERKZ_VERSION = "2.2.2" - lazy val JETTY_VERSION = "7.1.4.v20100610" + lazy val JETTY_VERSION = "7.1.6.v20100715" // ------------------------------------------------------------------------------------------------------------------- // Dependencies @@ -773,12 +778,12 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { - + - + From 96b8b455ed116c9712b3ba7ef68b137f18f060cd Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 15 Oct 2010 15:35:17 +0200 Subject: [PATCH 02/11] Improve API to wait for endpoint activation/deactivation. Closes #472 --- akka-camel/src/main/scala/CamelService.scala | 29 ++++++++- akka-camel/src/test/scala/ConsumerTest.scala | 63 +++++++++---------- .../src/test/scala/RemoteConsumerTest.scala | 20 +++--- .../main/scala/StandaloneApplication.scala | 25 +++----- .../scala/HttpConcurrencyTestStress.scala | 8 +-- 5 files changed, 77 insertions(+), 68 deletions(-) diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index 3795b8a7fb..23ff029b5e 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.camel import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import org.apache.camel.CamelContext @@ -94,12 +95,34 @@ trait CamelService extends Bootable with Logging { CamelContextManager.stop } + /** + * Waits for an expected number (count) of endpoints to be activated + * during execution of f. The wait-timeout is by default 10 seconds. + * Other timeout values can be set via the timeout and timeUnit + * parameters. + */ + def awaitEndpointActivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + val activation = expectEndpointActivationCount(count) + f; activation.await(timeout, timeUnit) + } + + /** + * Waits for an expected number (count) of endpoints to be de-activated + * during execution of f. The wait-timeout is by default 10 seconds. + * Other timeout values can be set via the timeout and timeUnit + * parameters. + */ + def awaitEndpointDeactivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + val activation = expectEndpointDeactivationCount(count) + f; activation.await(timeout, timeUnit) + } + /** * Sets an expectation on the number of upcoming endpoint activations and returns - * a CountDownLatch that can be used to wait for the activations to occur. Endpoint + * a CountDownLatch that can be used to wait for the activations to occur. Endpoint * activations that occurred in the past are not considered. */ - def expectEndpointActivationCount(count: Int): CountDownLatch = + private def expectEndpointActivationCount(count: Int): CountDownLatch = (consumerPublisher !! SetExpectedRegistrationCount(count)).as[CountDownLatch].get /** @@ -107,7 +130,7 @@ trait CamelService extends Bootable with Logging { * a CountDownLatch that can be used to wait for the de-activations to occur. Endpoint * de-activations that occurred in the past are not considered. */ - def expectEndpointDeactivationCount(count: Int): CountDownLatch = + private def expectEndpointDeactivationCount(count: Int): CountDownLatch = (consumerPublisher !! SetExpectedUnregistrationCount(count)).as[CountDownLatch].get } diff --git a/akka-camel/src/test/scala/ConsumerTest.scala b/akka-camel/src/test/scala/ConsumerTest.scala index 0af8aec7d5..ec8f6661fc 100644 --- a/akka-camel/src/test/scala/ConsumerTest.scala +++ b/akka-camel/src/test/scala/ConsumerTest.scala @@ -28,12 +28,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { // start consumer publisher, otherwise we cannot set message // count expectations in the next step (needed for testing only). service.consumerPublisher.start - // set expectations on publish count - val latch = service.expectEndpointActivationCount(1) - // start the CamelService - service.start - // await publication of first test consumer - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + service.start + } must be (true) } override protected def afterAll = { @@ -55,9 +52,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "started" must { "support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointActivationCount(1) - consumer.start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + consumer.start + } must be (true) mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") must equal ("received msg2") } "have an associated endpoint in the CamelContext" in { @@ -66,9 +63,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "stopped" must { "not support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointDeactivationCount(1) - consumer.stop - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(1) { + consumer.stop + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBody("direct:publish-test-2", "msg2") } @@ -80,9 +77,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { var actor: SampleTypedConsumer = null "started" must { "support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointActivationCount(3) - actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(3) { + actor = TypedActor.newInstance(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl]) + } must be (true) mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal ("m2: x y") mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal ("m3: x y") mandatoryTemplate.requestBodyAndHeader("direct:m4", "x", "test", "y") must equal ("m4: x y") @@ -90,9 +87,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } "stopped" must { "not support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointDeactivationCount(3) - TypedActor.stop(actor) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(3) { + TypedActor.stop(actor) + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") } @@ -110,18 +107,18 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { var actor: TestTypedConsumer = null "started" must { "support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointActivationCount(2) - actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl]) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(2) { + actor = TypedActor.newInstance(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl]) + } must be (true) mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal ("foo: x") mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal ("bar: x") } } "stopped" must { "not support in-out message exchanges via its endpoints" in { - val latch = service.expectEndpointDeactivationCount(2) - TypedActor.stop(actor) - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(2) { + TypedActor.stop(actor) + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.requestBody("direct:publish-test-3", "x") } @@ -136,17 +133,17 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { val consumer = UntypedActor.actorOf(classOf[SampleUntypedConsumer]) "started" must { "support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointActivationCount(1) - consumer.start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + consumer.start + } must be (true) mandatoryTemplate.requestBodyAndHeader("direct:test-untyped-consumer", "x", "test", "y") must equal ("x y") } } "stopped" must { "not support an in-out message exchange via its endpoint" in { - val latch = service.expectEndpointDeactivationCount(1) - consumer.stop - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointDeactivation(1) { + consumer.stop + } must be (true) intercept[CamelExecutionException] { mandatoryTemplate.sendBodyAndHeader("direct:test-untyped-consumer", "blah", "test", "blub") } @@ -157,9 +154,9 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { "A non-responding, blocking consumer" when { "receiving an in-out message exchange" must { "lead to a TimeoutException" in { - val latch = service.expectEndpointActivationCount(1) - actorOf(new TestBlocker("direct:publish-test-5")).start - latch.await(5000, TimeUnit.MILLISECONDS) must be (true) + service.awaitEndpointActivation(1) { + actorOf(new TestBlocker("direct:publish-test-5")).start + } must be (true) try { mandatoryTemplate.requestBody("direct:publish-test-5", "msg3") diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala index 2218aac25a..8c469a9379 100644 --- a/akka-camel/src/test/scala/RemoteConsumerTest.scala +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -45,9 +45,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = actorOf[RemoteConsumer].start when("remote consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer !! "init" - assert(latch.await(5000, TimeUnit.MILLISECONDS)) + assert(mandatoryService.awaitEndpointActivation(1) { + consumer !! "init" + }) then("the published consumer is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-consumer", "test") @@ -61,10 +61,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = TypedActor.newRemoteInstance(classOf[SampleRemoteTypedConsumer], classOf[SampleRemoteTypedConsumerImpl], host, port) when("remote typed consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer.foo("init") - assert(latch.await(5000, TimeUnit.MILLISECONDS)) - + assert(mandatoryService.awaitEndpointActivation(1) { + consumer.foo("init") + }) then("the published method is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBody("direct:remote-typed-consumer", "test") assert(response === "remote typed actor: test") @@ -77,10 +76,9 @@ class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWh val consumer = UntypedActor.actorOf(classOf[SampleRemoteUntypedConsumer]).start when("remote untyped consumer publication is triggered") - var latch = mandatoryService.expectEndpointActivationCount(1) - consumer.sendRequestReply(Message("init", Map("test" -> "init"))) - assert(latch.await(5000, TimeUnit.MILLISECONDS)) - + assert(mandatoryService.awaitEndpointActivation(1) { + consumer.sendRequestReply(Message("init", Map("test" -> "init"))) + }) then("the published untyped consumer is accessible via its endpoint URI") val response = CamelContextManager.mandatoryTemplate.requestBodyAndHeader("direct:remote-untyped-consumer", "a", "test", "b") assert(response === "a b") diff --git a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala index 2ecccb1e02..80684be78a 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala @@ -28,14 +28,10 @@ object StandaloneApplication extends Application { // access 'externally' registered typed actors assert("hello msg1" == mandatoryContext.createProducerTemplate.requestBody("direct:test", "msg1")) - // set expectations on upcoming endpoint activation - val activation = mandatoryService.expectEndpointActivationCount(1) - - // 'internally' register typed actor (requires CamelService) - TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl]) - - // internal registration is done in background. Wait a bit ... - activation.await + mandatoryService.awaitEndpointActivation(1) { + // 'internally' register typed actor (requires CamelService) + TypedActor.newInstance(classOf[TypedConsumer2], classOf[TypedConsumer2Impl]) + } // access 'internally' (automatically) registered typed-actors // (see @consume annotation value at TypedConsumer2.foo method) @@ -85,17 +81,13 @@ object StandaloneJmsApplication extends Application { startCamelService - // Expect two consumer endpoints to be activated - val completion = mandatoryService.expectEndpointActivationCount(2) - val jmsUri = "jms:topic:test" - // Wire publisher and consumer using a JMS topic - val jmsSubscriber1 = Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start - val jmsSubscriber2 = Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = Actor.actorOf(new Publisher("jms-publisher", jmsUri)).start - // wait for the consumer (subscriber) endpoint being activated - completion.await + mandatoryService.awaitEndpointActivation(2) { + Actor.actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start + Actor.actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start + } // Send 10 messages to via publisher actor for(i <- 1 to 10) { @@ -108,6 +100,5 @@ object StandaloneJmsApplication extends Application { } stopCamelService - ActorRegistry.shutdownAll } diff --git a/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala index 76cbc58a8b..e4530160f2 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/HttpConcurrencyTestStress.scala @@ -44,15 +44,15 @@ class HttpConcurrencyTestStress extends JUnitSuite { object HttpConcurrencyTestStress { @BeforeClass - def beforeClass = { + def beforeClass: Unit = { startCamelService val workers = for (i <- 1 to 8) yield actorOf[HttpServerWorker].start val balancer = loadBalancerActor(new CyclicIterator(workers.toList)) - val completion = service.get.expectEndpointActivationCount(1) - val server = actorOf(new HttpServerActor(balancer)).start - completion.await + service.get.awaitEndpointActivation(1) { + actorOf(new HttpServerActor(balancer)).start + } } @AfterClass From 513e9c567689fd83206d50cdd514836c583694e4 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 15 Oct 2010 16:54:25 +0200 Subject: [PATCH 03/11] Improve Java API to wait for endpoint activation/deactivation. Closes #472 --- akka-camel/src/main/scala/CamelService.scala | 48 ++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index 23ff029b5e..4b3e8834b7 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -11,8 +11,8 @@ import org.apache.camel.CamelContext import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry} import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.japi.{Option => JOption} import se.scalablesolutions.akka.util.{Logging, Bootable} +import se.scalablesolutions.akka.japi.{SideEffect, Option => JOption} /** * Publishes (untyped) consumer actors and typed consumer actors via Camel endpoints. Actors @@ -101,7 +101,7 @@ trait CamelService extends Bootable with Logging { * Other timeout values can be set via the timeout and timeUnit * parameters. */ - def awaitEndpointActivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + def awaitEndpointActivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit): Boolean = { val activation = expectEndpointActivationCount(count) f; activation.await(timeout, timeUnit) } @@ -112,11 +112,53 @@ trait CamelService extends Bootable with Logging { * Other timeout values can be set via the timeout and timeUnit * parameters. */ - def awaitEndpointDeactivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit) = { + def awaitEndpointDeactivation(count: Int, timeout: Long = 10, timeUnit: TimeUnit = TimeUnit.SECONDS)(f: => Unit): Boolean = { val activation = expectEndpointDeactivationCount(count) f; activation.await(timeout, timeUnit) } + /** + * Waits for an expected number (count) of endpoints to be activated + * during execution of p. The wait timeout is 10 seconds. + *

+ * Java API + */ + def awaitEndpointActivation(count: Int, p: SideEffect): Boolean = { + awaitEndpointActivation(count, 10, TimeUnit.SECONDS, p) + } + + /** + * Waits for an expected number (count) of endpoints to be activated + * during execution of p. Timeout values can be set via the + * timeout and timeUnit parameters. + *

+ * Java API + */ + def awaitEndpointActivation(count: Int, timeout: Long, timeUnit: TimeUnit, p: SideEffect): Boolean = { + awaitEndpointActivation(count, timeout, timeUnit) { p.apply } + } + + /** + * Waits for an expected number (count) of endpoints to be de-activated + * during execution of p. The wait timeout is 10 seconds. + *

+ * Java API + */ + def awaitEndpointDeactivation(count: Int, p: SideEffect): Boolean = { + awaitEndpointDeactivation(count, 10, TimeUnit.SECONDS, p) + } + + /** + * Waits for an expected number (count) of endpoints to be de-activated + * during execution of p. Timeout values can be set via the + * timeout and timeUnit parameters. + *

+ * Java API + */ + def awaitEndpointDeactivation(count: Int, timeout: Long, timeUnit: TimeUnit, p: SideEffect): Boolean = { + awaitEndpointDeactivation(count, timeout, timeUnit) { p.apply } + } + /** * Sets an expectation on the number of upcoming endpoint activations and returns * a CountDownLatch that can be used to wait for the activations to occur. Endpoint From a1b29e8093fae068e162d8f29ea4caaa15fff51a Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sat, 16 Oct 2010 10:13:06 +0200 Subject: [PATCH 04/11] Added missing Consumer trait to example actor --- akka-samples/akka-sample-camel/src/main/scala/Actors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index db2aab1729..80c854f954 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -41,7 +41,7 @@ class Consumer1 extends Actor with Consumer with Logging { } } -class Consumer2 extends Actor { +class Consumer2 extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8877/camel/default" def receive = { From 561cdfcf00c5a05d0d8d464ef4f395ed6784b6b5 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 19 Oct 2010 10:44:04 +0200 Subject: [PATCH 05/11] Upgrade to Camel 2.5 release candidate leaving ActiveMQ at version 5.3.2 because of https://issues.apache.org/activemq/browse/AMQ-2935 --- project/build/AkkaProject.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index dc87078172..37ca9108dd 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -91,14 +91,14 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------- // TEMPORARY // ------------------------------------- - lazy val apacheSnapshots = MavenRepository("apache-snapshots", "https://repository.apache.org/content/repositories/snapshots/") + val camelStaging = MavenRepository("camel-staging", "https://repository.apache.org/content/repositories/orgapachecamel-001/") // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- lazy val ATMO_VERSION = "0.6.2" - lazy val CAMEL_VERSION = "2.5-SNAPSHOT" + lazy val CAMEL_VERSION = "2.5.0" lazy val CASSANDRA_VERSION = "0.6.1" lazy val DISPATCH_VERSION = "0.7.4" lazy val HAWT_DISPATCH_VERSION = "1.0" @@ -783,7 +783,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { - + From dcde8373b2d8a5d0340eae52d91b0bda06044533 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sun, 24 Oct 2010 18:47:28 +0200 Subject: [PATCH 06/11] Use a cached JMS ConnectionFactory. --- .../akka-sample-camel/src/main/resources/context-jms.xml | 6 +++++- .../src/main/scala/StandaloneApplication.scala | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml index b3d811d8de..12e4541be3 100644 --- a/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml +++ b/akka-samples/akka-sample-camel/src/main/resources/context-jms.xml @@ -13,7 +13,11 @@ http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> - + + + + + diff --git a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala index 80684be78a..ed858c4cc9 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala @@ -99,6 +99,9 @@ object StandaloneJmsApplication extends Application { CamelContextManager.mandatoryTemplate.sendBody(jmsUri, "Camel rocks (%d)" format i) } + // Wait a bit for subscribes to receive messages + Thread.sleep(1000) + stopCamelService ActorRegistry.shutdownAll } From aa586ea71859e5963accd3c60f2733cdbe4d10ca Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 26 Oct 2010 07:48:41 +0200 Subject: [PATCH 07/11] Upgrade to Camel 2.5 release candidate 2 --- project/build/AkkaProject.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 6e91d6088c..483ae79ba8 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -116,7 +116,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------- // TEMPORARY // ------------------------------------- - val camelStaging = MavenRepository("camel-staging", "https://repository.apache.org/content/repositories/orgapachecamel-001/") + val camelStaging = MavenRepository("camel-staging", "https://repository.apache.org/content/repositories/orgapachecamel-004/") // ------------------------------------------------------------------------------------------------------------------- // Versions From 01b1a0eed946fec211d6a643574e3dd6db9e4c0b Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 29 Oct 2010 11:37:41 +0200 Subject: [PATCH 08/11] Fixed compile error after resolving merge conflict --- akka-camel/src/main/scala/CamelService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-camel/src/main/scala/CamelService.scala b/akka-camel/src/main/scala/CamelService.scala index 0c72100ba8..b546636610 100644 --- a/akka-camel/src/main/scala/CamelService.scala +++ b/akka-camel/src/main/scala/CamelService.scala @@ -11,7 +11,7 @@ import org.apache.camel.CamelContext import akka.actor.Actor._ import akka.actor.{AspectInitRegistry, ActorRegistry} import akka.config.Config._ -import akka.japi.{Option => JOption} +import akka.japi.{SideEffect, Option => JOption} import akka.util.{Logging, Bootable} /** From a2a27a169b4f1529a82f9ec30ed2587c949ed2bb Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 29 Oct 2010 11:54:34 +0200 Subject: [PATCH 09/11] Remove Camel staging repo --- project/build/AkkaProject.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e2f3b8d3e6..d8a570a7a7 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -113,11 +113,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! - // ------------------------------------- - // TEMPORARY - // ------------------------------------- - val camelStaging = MavenRepository("camel-staging", "https://repository.apache.org/content/repositories/orgapachecamel-004/") - // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- From 297519264274b1341832c18fed18e9b58fd8217b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 29 Oct 2010 15:31:07 +0200 Subject: [PATCH 10/11] Adding shutdown hook that clears logging levels registered by Configgy, closing ticket 486 --- akka-actor/src/main/scala/actor/Actor.scala | 40 ++++++++++++++++++++- akka-http/src/main/scala/AkkaLoader.scala | 2 ++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 943854fc61..a2f7d60848 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -82,6 +82,45 @@ class ActorTimeoutException private[akka](message: String) extends AkkaException * @author Jonas Bonér */ object Actor extends Logging { + + /** + * Add shutdown cleanups + */ + private[akka] lazy val shutdownHook = { + val hook = new Runnable { + override def run { + log.info("Running shutdown hook to do a cleanup of registered components.") + // Shutdown HawtDispatch GlobalQueue + org.fusesource.hawtdispatch.ScalaDispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown + + // Clear Thread.subclassAudits + val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") + tf.setAccessible(true) + val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] + subclassAudits.synchronized {subclassAudits.clear} + + // Clear and reset j.u.l.Level.known (due to Configgy) + val lf = classOf[java.util.logging.Level].getDeclaredField("known") + lf.setAccessible(true) + val known = lf.get(null).asInstanceOf[java.util.ArrayList[java.util.logging.Level]] + known.synchronized { + known.clear + known.add(java.util.logging.Level.OFF) + known.add(java.util.logging.Level.SEVERE) + known.add(java.util.logging.Level.WARNING) + known.add(java.util.logging.Level.INFO) + known.add(java.util.logging.Level.CONFIG) + known.add(java.util.logging.Level.FINE) + known.add(java.util.logging.Level.FINER) + known.add(java.util.logging.Level.FINEST) + known.add(java.util.logging.Level.ALL) + } + } + } + Runtime.getRuntime.addShutdownHook(new Thread(hook)) + hook + } + val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) @@ -110,7 +149,6 @@ object Actor extends Logging { def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** -<<<<<<< HEAD:akka-actor/src/main/scala/actor/Actor.scala * Creates an ActorRef out of the Actor with type T. *

    *   import Actor._
diff --git a/akka-http/src/main/scala/AkkaLoader.scala b/akka-http/src/main/scala/AkkaLoader.scala
index d8afac67bc..22ff40016c 100644
--- a/akka-http/src/main/scala/AkkaLoader.scala
+++ b/akka-http/src/main/scala/AkkaLoader.scala
@@ -6,6 +6,7 @@ package akka.servlet
 
 import akka.config.Config
 import akka.util.{Logging, Bootable}
+import akka.actor.Actor
 
 /*
  * This class is responsible for booting up a stack of bundles and then shutting them down
@@ -40,6 +41,7 @@ class AkkaLoader extends Logging {
       log.info("Shutting down Akka...")
       _bundles.foreach(_.onUnload)
       _bundles = None
+      Actor.shutdownHook.run
       log.info("Akka succesfully shut down")
     }
   }

From 0906bb5d3f32ad9cee508cbe531d03a22f8a10ce Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Fri, 29 Oct 2010 16:11:43 +0200
Subject: [PATCH 11/11] Cleaned up shutdown hook code and increased readability

---
 akka-actor/src/main/scala/actor/Actor.scala | 22 +++++++++------------
 1 file changed, 9 insertions(+), 13 deletions(-)

diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala
index a2f7d60848..0fe1e9766b 100644
--- a/akka-actor/src/main/scala/actor/Actor.scala
+++ b/akka-actor/src/main/scala/actor/Actor.scala
@@ -89,31 +89,27 @@ object Actor extends Logging {
   private[akka] lazy val shutdownHook = {
     val hook = new Runnable {
       override def run {
-        log.info("Running shutdown hook to do a cleanup of registered components.")
         // Shutdown HawtDispatch GlobalQueue
+        log.info("Shutting down Hawt Dispatch global queue")
         org.fusesource.hawtdispatch.ScalaDispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown
 
-      // Clear Thread.subclassAudits
+        // Clear Thread.subclassAudits
+        log.info("Clearing subclass audits")
         val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
         tf.setAccessible(true)
         val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]]
         subclassAudits.synchronized {subclassAudits.clear}
 
         // Clear and reset j.u.l.Level.known (due to Configgy)
-        val lf = classOf[java.util.logging.Level].getDeclaredField("known")
+        log.info("Removing Configgy-installed log levels")
+        import java.util.logging.Level
+        val lf = classOf[Level].getDeclaredField("known")
         lf.setAccessible(true)
-        val known = lf.get(null).asInstanceOf[java.util.ArrayList[java.util.logging.Level]]
+        val known = lf.get(null).asInstanceOf[java.util.ArrayList[Level]]
         known.synchronized {
           known.clear
-          known.add(java.util.logging.Level.OFF)
-          known.add(java.util.logging.Level.SEVERE)
-          known.add(java.util.logging.Level.WARNING)
-          known.add(java.util.logging.Level.INFO)
-          known.add(java.util.logging.Level.CONFIG)
-          known.add(java.util.logging.Level.FINE)
-          known.add(java.util.logging.Level.FINER)
-          known.add(java.util.logging.Level.FINEST)
-          known.add(java.util.logging.Level.ALL)
+          List(Level.OFF,Level.SEVERE,Level.WARNING,Level.INFO,Level.CONFIG,
+               Level.FINE,Level.FINER,Level.FINEST,Level.ALL) foreach known.add
         }
       }
     }