diff --git a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java new file mode 100644 index 0000000000..f84788e1fe --- /dev/null +++ b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectComponent.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.component.bean.BeanComponent; +import org.apache.camel.component.bean.BeanEndpoint; +import org.apache.camel.component.bean.BeanHolder; + +/** + * Camel component for accessing active objects. + * + * @author Martin Krasser + */ +public class ActiveObjectComponent extends BeanComponent { + + public static final String DEFAULT_SCHEMA = "actobj"; + + private Map registry = new ConcurrentHashMap(); + + public Map getActiveObjectRegistry() { + return registry; + } + + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + BeanEndpoint beanEndpoint = new BeanEndpoint(uri, this); + beanEndpoint.setBeanName(remaining); + beanEndpoint.setBeanHolder(createBeanHolder(remaining)); + Processor processor = beanEndpoint.getProcessor(); + setProperties(processor, parameters); + return beanEndpoint; + } + + private BeanHolder createBeanHolder(String beanName) throws Exception { + BeanHolder holder = new ActiveObjectHolder(registry, getCamelContext(), beanName).createCacheHolder(); + registry.remove(beanName); + return holder; + } + +} diff --git a/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java new file mode 100644 index 0000000000..289faeeac1 --- /dev/null +++ b/akka-camel/src/main/java/se/scalablesolutions/akka/camel/component/ActiveObjectHolder.java @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.camel.component; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.NoSuchBeanException; +import org.apache.camel.component.bean.BeanInfo; +import org.apache.camel.component.bean.RegistryBean; + +/** + * @author Martin Krasser + */ +public class ActiveObjectHolder extends RegistryBean { + + private Map activeObjectRegistry; + + public ActiveObjectHolder(Map activeObjectRegistry, CamelContext context, String name) { + super(context, name); + this.activeObjectRegistry = activeObjectRegistry; + } + + @Override + public BeanInfo getBeanInfo() { + return new BeanInfo(getContext(), getBean().getClass(), getParameterMappingStrategy()); + } + + @Override + public Object getBean() throws NoSuchBeanException { + return activeObjectRegistry.get(getName()); + } + +} diff --git a/akka-camel/src/main/scala/CamelContextLifecycle.scala b/akka-camel/src/main/scala/CamelContextLifecycle.scala index 9aee8f5a1d..1015b56b64 100644 --- a/akka-camel/src/main/scala/CamelContextLifecycle.scala +++ b/akka-camel/src/main/scala/CamelContextLifecycle.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.camel import org.apache.camel.{ProducerTemplate, CamelContext} import org.apache.camel.impl.DefaultCamelContext +import se.scalablesolutions.akka.camel.component.ActiveObjectComponent import se.scalablesolutions.akka.util.Logging /** @@ -26,7 +27,18 @@ trait CamelContextLifecycle extends Logging { private var _started = false /** - * Returns the managed CamelContext. + * Camel component for accessing active objects. + */ + private[camel] var activeObjectComponent: ActiveObjectComponent = _ + + /** + * Registry in which active objects are TEMPORARILY registered during + * creation of Camel routes to active objects. + */ + private[camel] var activeObjectRegistry: java.util.Map[String, AnyRef] = _ + + /** + * Returns the managed CamelContext. */ protected def context: CamelContext = _context @@ -80,8 +92,11 @@ trait CamelContextLifecycle extends Logging { * caching they can do so after this method returned and prior to calling start. */ def init(context: CamelContext) { + this.activeObjectComponent = new ActiveObjectComponent + this.activeObjectRegistry = activeObjectComponent.getActiveObjectRegistry this.context = context this.context.setStreamCaching(true) + this.context.addComponent(ActiveObjectComponent.DEFAULT_SCHEMA, activeObjectComponent) this.template = context.createProducerTemplate _initialized = true log.info("Camel context initialized") diff --git a/akka-camel/src/main/scala/service/CamelService.scala b/akka-camel/src/main/scala/service/CamelService.scala index 80fa603897..6cf3a607cc 100644 --- a/akka-camel/src/main/scala/service/CamelService.scala +++ b/akka-camel/src/main/scala/service/CamelService.scala @@ -4,8 +4,8 @@ package se.scalablesolutions.akka.camel.service -import se.scalablesolutions.akka.actor.ActorRegistry import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{AspectInitRegistry, ActorRegistry} import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.util.{Bootable, Logging} @@ -21,7 +21,13 @@ trait CamelService extends Bootable with Logging { import CamelContextManager._ private[camel] val consumerPublisher = actorOf[ConsumerPublisher] - private[camel] val publishRequestor = actorOf(new PublishRequestor(consumerPublisher)) + private[camel] val publishRequestor = actorOf[PublishRequestor] + + // add listener for actor registration events + ActorRegistry.addListener(publishRequestor) + + // add listener for AspectInit registration events + AspectInitRegistry.addListener(publishRequestor) /** * Starts the CamelService. Any started actor that is a consumer actor will be (asynchronously) @@ -38,19 +44,16 @@ trait CamelService extends Bootable with Logging { // start actor that exposes consumer actors via Camel endpoints consumerPublisher.start - // add listener for actor registration events - ActorRegistry.addRegistrationListener(publishRequestor.start) - - // publish already registered consumer actors - for (actor <- ActorRegistry.actors; event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event + // init publishRequestor so that buffered and future events are delivered to consumerPublisher + publishRequestor ! PublishRequestorInit(consumerPublisher) } /** * Stops the CamelService. */ abstract override def onUnload = { - ActorRegistry.removeRegistrationListener(publishRequestor) - publishRequestor.stop + ActorRegistry.removeListener(publishRequestor) + AspectInitRegistry.removeListener(publishRequestor) consumerPublisher.stop stop super.onUnload @@ -82,5 +85,14 @@ object CamelService { /** * Creates a new CamelService instance. */ - def newInstance: CamelService = new CamelService {} + def newInstance: CamelService = new DefaultCamelService } + +/** + * Default CamelService implementation to be created in Java applications with + *
+ * CamelService service = new DefaultCamelService()
+ * 
+ */ +class DefaultCamelService extends CamelService { +} \ No newline at end of file diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 167f3acaa1..5d7ca23961 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -4,14 +4,17 @@ package se.scalablesolutions.akka.camel.service import java.io.InputStream +import java.lang.reflect.Method import java.util.concurrent.CountDownLatch import org.apache.camel.builder.RouteBuilder -import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef} +import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.camel.component.ActiveObjectComponent +import collection.mutable.ListBuffer /** * Actor that publishes consumer actors as Camel endpoints at the CamelContext managed @@ -22,20 +25,29 @@ import se.scalablesolutions.akka.util.Logging * @author Martin Krasser */ class ConsumerPublisher extends Actor with Logging { - @volatile private var publishLatch = new CountDownLatch(0) - @volatile private var unpublishLatch = new CountDownLatch(0) + + // TODO: redesign waiting (actor testing) mechanism + // - do not use custom methods on actor + // - only interact by passing messages + // TODO: factor out code that is needed for testing + + @volatile private var publishActorLatch = new CountDownLatch(0) + @volatile private var unpublishActorLatch = new CountDownLatch(0) /** - * Adds a route to the actor identified by a Publish message to the global CamelContext. + * Adds a route to the actor identified by a Publish message to the global CamelContext. */ protected def receive = { case r: ConsumerRegistered => { handleConsumerRegistered(r) - publishLatch.countDown // needed for testing only. + publishActorLatch.countDown // needed for testing only. } case u: ConsumerUnregistered => { handleConsumerUnregistered(u) - unpublishLatch.countDown // needed for testing only. + unpublishActorLatch.countDown // needed for testing only. + } + case d: ConsumerMethodRegistered => { + handleConsumerMethodDetected(d) } case _ => { /* ignore */} } @@ -43,24 +55,24 @@ class ConsumerPublisher extends Actor with Logging { /** * Sets the expected number of actors to be published. Used for testing only. */ - private[camel] def expectPublishCount(count: Int): Unit = - publishLatch = new CountDownLatch(count) + private[camel] def expectPublishActorCount(count: Int): Unit = + publishActorLatch = new CountDownLatch(count) /** * Sets the expected number of actors to be unpublished. Used for testing only. */ - private[camel] def expectUnpublishCount(count: Int): Unit = - unpublishLatch = new CountDownLatch(count) + private[camel] def expectUnpublishActorCount(count: Int): Unit = + unpublishActorLatch = new CountDownLatch(count) /** * Waits for the expected number of actors to be published. Used for testing only. */ - private[camel] def awaitPublish = publishLatch.await + private[camel] def awaitPublishActor = publishActorLatch.await /** * Waits for the expected number of actors to be unpublished. Used for testing only. */ - private[camel] def awaitUnpublish = unpublishLatch.await + private[camel] def awaitUnpublishActor = unpublishActorLatch.await /** * Creates a route to the registered consumer actor. @@ -77,6 +89,17 @@ class ConsumerPublisher extends Actor with Logging { CamelContextManager.context.stopRoute(event.id) log.info("unpublished actor %s (%s) from endpoint %s" format (event.clazz, event.id, event.uri)) } + + private def handleConsumerMethodDetected(event: ConsumerMethodRegistered) { + // using the actor uuid is highly experimental + val objectId = event.init.actorRef.uuid + val targetClass = event.init.target.getName + val targetMethod = event.method.getName + + CamelContextManager.activeObjectRegistry.put(objectId, event.activeObject) + CamelContextManager.context.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) + log.info("published method %s.%s (%s) at endpoint %s" format (targetClass, targetMethod, objectId, event.uri)) + } } /** @@ -90,6 +113,12 @@ class ConsumerPublisher extends Actor with Logging { * @author Martin Krasser */ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends RouteBuilder { + // + // + // TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute + // + // + // TODO: make conversions configurable private val bodyConversions = Map( "file" -> classOf[InputStream] @@ -106,20 +135,69 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends private def actorUri = (if (uuid) "actor:uuid:%s" else "actor:id:%s") format id } +class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends RouteBuilder { + // + // + // TODO: factor out duplicated code from ConsumerRoute and ConsumerMethodRoute + // + // + + // TODO: make conversions configurable + private val bodyConversions = Map( + "file" -> classOf[InputStream] + ) + + def configure = { + val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." + bodyConversions.get(schema) match { + case Some(clazz) => from(endpointUri).convertBodyTo(clazz).to(activeObjectUri) + case None => from(endpointUri).to(activeObjectUri) + } + } + + private def activeObjectUri = "%s:%s?method=%s" format (ActiveObjectComponent.DEFAULT_SCHEMA, id, method) +} + /** * A registration listener that triggers publication and un-publication of consumer actors. * * @author Martin Krasser */ -class PublishRequestor(consumerPublisher: ActorRef) extends Actor { +class PublishRequestor extends Actor { + private val events = ListBuffer[ConsumerEvent]() + private var publisher: Option[ActorRef] = None + protected def receive = { - case ActorRegistered(actor) => for (event <- ConsumerRegistered.forConsumer(actor)) consumerPublisher ! event - case ActorUnregistered(actor) => for (event <- ConsumerUnregistered.forConsumer(actor)) consumerPublisher ! event + case ActorRegistered(actor) => + for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event) + case ActorUnregistered(actor) => + for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event) + case AspectInitRegistered(proxy, init) => + for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event) + case PublishRequestorInit(pub) => { + publisher = Some(pub) + deliverBufferedEvents + } + case _ => { /* ignore */ } + } + + private def deliverCurrentEvent(event: ConsumerEvent) = { + publisher match { + case Some(pub) => pub ! event + case None => events += event + } + } + + private def deliverBufferedEvents = { + for (event <- events) deliverCurrentEvent(event) + events.clear } } +case class PublishRequestorInit(consumerPublisher: ActorRef) + /** - * Consumer actor lifecycle event. + * A consumer event. * * @author Martin Krasser */ @@ -151,6 +229,8 @@ case class ConsumerRegistered(clazz: String, uri: String, id: String, uuid: Bool */ case class ConsumerUnregistered(clazz: String, uri: String, id: String, uuid: Boolean) extends ConsumerEvent +case class ConsumerMethodRegistered(activeObject: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent + /** * @author Martin Krasser */ @@ -179,6 +259,17 @@ private[camel] object ConsumerUnregistered { } } +private[camel] object ConsumerMethodRegistered { + def forConsumer(activeObject: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = { + // TODO: support/test annotations on interface methods + // TODO: support/test annotations on superclass methods + // TODO: document that overloaded methods are not supported + if (init.remoteAddress.isDefined) Nil // let remote node publish active object methods on endpoints + else for (m <- activeObject.getClass.getMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) + yield ConsumerMethodRegistered(activeObject, init, m.getAnnotation(classOf[consume]).value, m) + } +} + /** * Describes a consumer actor with elements that are relevant for publishing an actor at a * Camel endpoint (or unpublishing an actor from an endpoint). diff --git a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala index fd106f799f..26b31daffa 100644 --- a/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceFeatureTest.scala @@ -33,21 +33,23 @@ object CamelServiceFeatureTest { class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { import CamelServiceFeatureTest._ - var service: CamelService = CamelService.newInstance + var service: CamelService = _ override protected def beforeAll = { ActorRegistry.shutdownAll + // create new CamelService instance + service = CamelService.newInstance // register test consumer before starting the CamelService actorOf(new TestConsumer("direct:publish-test-1")).start - // Consigure a custom camel route + // Configure a custom camel route CamelContextManager.init CamelContextManager.context.addRoutes(new TestRoute) // set expectations for testing purposes - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1) // start the CamelService service.load // await publication of first test consumer - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor } override protected def afterAll = { @@ -60,9 +62,9 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi scenario("access registered consumer actors via Camel direct-endpoints") { given("two consumer actors registered before and after CamelService startup") - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1) actorOf(new TestConsumer("direct:publish-test-2")).start - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor when("requests are sent to these actors") val response1 = CamelContextManager.template.requestBody("direct:publish-test-1", "msg1") @@ -81,14 +83,14 @@ class CamelServiceFeatureTest extends FeatureSpec with BeforeAndAfterAll with Gi given("a consumer actor that has been stopped") assert(CamelContextManager.context.hasEndpoint(endpointUri) eq null) - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishCount(1) + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectPublishActorCount(1) val consumer = actorOf(new TestConsumer(endpointUri)).start - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublish + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitPublishActor assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishCount(1) + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].expectUnpublishActorCount(1) consumer.stop - service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublish + service.consumerPublisher.actor.asInstanceOf[ConsumerPublisher].awaitUnpublishActor // endpoint is still there but the route has been stopped assert(CamelContextManager.context.hasEndpoint(endpointUri) ne null) diff --git a/akka-camel/src/test/scala/service/PublishRequestorTest.scala b/akka-camel/src/test/scala/service/PublishRequestorTest.scala index a7f1685de1..f8fbd7841e 100644 --- a/akka-camel/src/test/scala/service/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/service/PublishRequestorTest.scala @@ -3,10 +3,11 @@ package se.scalablesolutions.akka.camel.service import _root_.org.junit.{Before, After, Test} import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered} +import se.scalablesolutions.akka.actor.Actor._ + import se.scalablesolutions.akka.camel.Consumer import se.scalablesolutions.akka.camel.support.{Receive, Countdown} -import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorRegistry, ActorRegistered, ActorUnregistered} -import Actor._ object PublishRequestorTest { class PublisherMock extends Actor with Receive[ConsumerEvent] { @@ -28,11 +29,13 @@ class PublishRequestorTest extends JUnitSuite { @Before def setUp = { publisher = actorOf(new PublisherMock with Countdown[ConsumerEvent]).start - requestor = actorOf(new PublishRequestor(publisher)).start + requestor = actorOf(new PublishRequestor).start + requestor ! PublishRequestorInit(publisher) consumer = actorOf(new Actor with Consumer { def endpointUri = "mock:test" protected def receive = null }).start + } @After def tearDown = { diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java index 11552609f5..1a8e5c8db6 100644 --- a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java +++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/consume.java @@ -10,7 +10,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) +@Target({ElementType.TYPE, ElementType.METHOD}) public @interface consume { public abstract String value(); diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 9ec943cfc6..2d83cf2ba4 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -476,7 +476,7 @@ object ActiveObject extends Logging { } -private[akka] object AspectInitRegistry { +private[akka] object AspectInitRegistry extends ListenerManagement { private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit] def initFor(target: AnyRef) = { @@ -485,9 +485,16 @@ private[akka] object AspectInitRegistry { init } - def register(target: AnyRef, init: AspectInit) = initializations.put(target, init) + def register(target: AnyRef, init: AspectInit) = { + val res = initializations.put(target, init) + foreachListener(_ ! AspectInitRegistered(target, init)) + res + } } +private[akka] sealed trait AspectInitRegistryEvent +private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent + private[akka] sealed case class AspectInit( val target: Class[_], val actorRef: ActorRef, diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index ea77b2c6b7..5943985b57 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -4,11 +4,10 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.util.Logging - import scala.collection.mutable.ListBuffer import scala.reflect.Manifest import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +import se.scalablesolutions.akka.util.ListenerManagement sealed trait ActorRegistryEvent case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent @@ -26,11 +25,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * * @author Jonas Bonér */ -object ActorRegistry extends Logging { +object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, List[ActorRef]] private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]] - private val registrationListeners = new CopyOnWriteArrayList[ActorRef] /** * Returns all actors in the system. @@ -135,29 +133,4 @@ object ActorRegistry extends Logging { actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } - - /** - * Adds the registration listener this this registry's listener list. - */ - def addRegistrationListener(listener: ActorRef) = { - listener.start - registrationListeners.add(listener) - } - - /** - * Removes the registration listener this this registry's listener list. - */ - def removeRegistrationListener(listener: ActorRef) = { - listener.stop - registrationListeners.remove(listener) - } - - private def foreachListener(f: (ActorRef) => Unit) { - val iterator = registrationListeners.iterator - while (iterator.hasNext) { - val listener = iterator.next - if (listener.isRunning) f(listener) - else log.warning("Can't send ActorRegistryEvent to [%s] since it is not running.", listener) - } - } } diff --git a/akka-core/src/main/scala/util/ListenerManagement.scala b/akka-core/src/main/scala/util/ListenerManagement.scala new file mode 100644 index 0000000000..bdee9d1c1c --- /dev/null +++ b/akka-core/src/main/scala/util/ListenerManagement.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.util + +import java.util.concurrent.CopyOnWriteArrayList + +import se.scalablesolutions.akka.actor.ActorRef + +/** + * A manager for listener actors. Intended for mixin by observables. + * + * @author Martin Krasser + */ +trait ListenerManagement extends Logging { + + private val listeners = new CopyOnWriteArrayList[ActorRef] + + /** + * Adds the listener this this registry's listener list. + * The listener is started by this method. + */ + def addListener(listener: ActorRef) = { + listener.start + listeners.add(listener) + } + + /** + * Removes the listener this this registry's listener list. + * The listener is stopped by this method. + */ + def removeListener(listener: ActorRef) = { + listener.stop + listeners.remove(listener) + } + + /** + * Execute f with each listener as argument. + */ + protected def foreachListener(f: (ActorRef) => Unit) { + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + if (listener.isRunning) f(listener) + else log.warning("Can't notify [%s] since it is not running.", listener) + } + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java new file mode 100644 index 0000000000..af1ccaf8c4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java @@ -0,0 +1,24 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class Consumer10 { + + @consume("file:data/input2") + public void foo(String body) { + System.out.println("Received message:"); + System.out.println(body); + } + + @consume("jetty:http://0.0.0.0:8877/camel/active") + public String bar(@Body String body, @Header("name") String header) { + return String.format("%s %s", body, header); + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index 93aaa08ff6..157ae195ea 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -37,7 +37,7 @@ class Producer1 extends Actor with Producer { } class Consumer1 extends Actor with Consumer with Logging { - def endpointUri = "file:data/input" + def endpointUri = "file:data/input1" def receive = { case msg: Message => log.info("received %s" format msg.bodyAs[String]) diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 5b08e15a1a..29e65c3051 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -6,10 +6,10 @@ import org.apache.camel.impl.DefaultCamelContext import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext -import se.scalablesolutions.akka.actor.SupervisorFactory import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.actor.{ActiveObject, SupervisorFactory} /** * @author Martin Krasser @@ -62,6 +62,9 @@ class Boot { actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again. + + // Publish active object methods on endpoints + ActiveObject.newInstance(classOf[Consumer10]) } class CustomRouteBuilder extends RouteBuilder {