diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml new file mode 100644 index 0000000000..fc069ffd71 --- /dev/null +++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/dispatcher-config.xml @@ -0,0 +1,81 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + java.io.IOException + java.lang.NullPointerException + + + + + + + + + + + + + + \ No newline at end of file diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/supervisor-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/supervisor-config.xml new file mode 100644 index 0000000000..c763bf47d1 --- /dev/null +++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/supervisor-config.xml @@ -0,0 +1,87 @@ + + + + + + + java.io.IOException + java.lang.NullPointerException + + + + + + + + + + + + + + + java.lang.Exception + + + + + + + + + + + + + + + java.lang.Exception + + + + + + + + + + + + + + + + + java.lang.Exception + + + + + + + + + + java.lang.IOException + + + + + + + + + \ No newline at end of file diff --git a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml index 0b35ade59f..7e0c212382 100644 --- a/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml +++ b/akka-spring/akka-spring-test-java/src/main/resources/se/scalablesolutions/akka/spring/foo/test-config.xml @@ -1,76 +1,75 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - java.io.IOException - java.lang.NullPointerException - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + java.io.IOException + java.lang.NullPointerException + + + + + + + + + + - - - - java.lang.Exception - - - - - - - \ No newline at end of file diff --git a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/ActiveObjectConfigurationTest.java b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/ActiveObjectConfigurationTest.java new file mode 100644 index 0000000000..7cc691f3e3 --- /dev/null +++ b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/ActiveObjectConfigurationTest.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.beans.factory.xml.XmlBeanDefinitionReader; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +import se.scalablesolutions.akka.config.Config; +import se.scalablesolutions.akka.dispatch.FutureTimeoutException; +import se.scalablesolutions.akka.remote.RemoteNode; +import se.scalablesolutions.akka.spring.foo.MyPojo; + +/** + * Tests for spring configuration of active objects and supervisor configuration. + * @author michaelkober + */ +public class ActiveObjectConfigurationTest { + + private ApplicationContext context = null; + + @Before + public void setUp() { + context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/test-config.xml"); + } + + /** + * Tests that the <akka:active-object/> and <akka:supervision/> and <akka:dispatcher/> element + * can be used as a top level element. + */ + @Test + public void testParse() throws Exception { + final Resource CONTEXT = new ClassPathResource("se/scalablesolutions/akka/spring/foo/test-config.xml"); + DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory(); + XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory); + reader.loadBeanDefinitions(CONTEXT); + assertTrue(beanFactory.containsBeanDefinition("simple-active-object")); + assertTrue(beanFactory.containsBeanDefinition("remote-active-object")); + assertTrue(beanFactory.containsBeanDefinition("supervision1")); + assertTrue(beanFactory.containsBeanDefinition("dispatcher1")); + } + + @Test + public void testSimpleActiveObject() { + MyPojo myPojo = (MyPojo) context.getBean("simple-active-object"); + String msg = myPojo.getFoo(); + msg += myPojo.getBar(); + assertEquals("wrong invocation order", "foobar", msg); + } + + @Test(expected = FutureTimeoutException.class) + public void testSimpleActiveObject_Timeout() { + MyPojo myPojo = (MyPojo) context.getBean("simple-active-object"); + myPojo.longRunning(); + } + + @Test + public void testSimpleActiveObject_NoTimeout() { + MyPojo myPojo = (MyPojo) context.getBean("simple-active-object-long-timeout"); + String msg = myPojo.longRunning(); + assertEquals("this took long", msg); + } + + @Test + public void testTransactionalActiveObject() { + MyPojo myPojo = (MyPojo) context.getBean("transactional-active-object"); + String msg = myPojo.getFoo(); + msg += myPojo.getBar(); + assertEquals("wrong invocation order", "foobar", msg); + } + + @Test + public void testRemoteActiveObject() { + new Thread(new Runnable() { + public void run() { + RemoteNode.start(); + } + }).start(); + try { + Thread.currentThread().sleep(1000); + } catch (Exception e) { + } + Config.config(); + + MyPojo myPojo = (MyPojo) context.getBean("remote-active-object"); + assertEquals("foo", myPojo.getFoo()); + } + + +} diff --git a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/DispatcherConfigurationTest.java b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/DispatcherConfigurationTest.java new file mode 100644 index 0000000000..9f941e4142 --- /dev/null +++ b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/DispatcherConfigurationTest.java @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenDispatcher; +import se.scalablesolutions.akka.dispatch.ReactorBasedThreadPoolEventDrivenDispatcher; +import se.scalablesolutions.akka.dispatch.ReactorBasedSingleThreadEventDrivenDispatcher; +import se.scalablesolutions.akka.dispatch.MessageDispatcher; +import se.scalablesolutions.akka.dispatch.ThreadPoolBuilder; + +import se.scalablesolutions.akka.spring.foo.MyPojo; + +/** + * Tests for spring configuration of dispatcher configuration. + * @author michaelkober + */ +public class DispatcherConfigurationTest { + + private ApplicationContext context = null; + + @Before + public void setUp() { + context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/dispatcher-config.xml"); + } + + /** + * test for executor-event-driven-dispatcher with array-blocking-queue + */ + @Test + public void testDispatcher() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-1"); + ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher); + assertEquals("wrong core pool size", 1, executor.getCorePoolSize()); + assertEquals("wrong max pool size", 20, executor.getMaximumPoolSize()); + assertEquals("wrong keep alive", 3000, executor.getKeepAliveTime(TimeUnit.MILLISECONDS)); + assertTrue("wrong queue type",executor.getQueue() instanceof ArrayBlockingQueue); + assertEquals("wrong capacity", 100, executor.getQueue().remainingCapacity()); + } + + /** + * test for dispatcher via ref + */ + @Test + public void testDispatcherRef() { + MyPojo pojo = (MyPojo) context.getBean("active-object-with-dispatcher-ref"); + assertNotNull(pojo); + } + + /** + * test for executor-event-driven-dispatcher with bounded-linked-blocking-queue with unbounded capacity + */ + @Test + public void testDispatcherWithBoundedLinkedBlockingQueueWithUnboundedCapacity() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-2"); + ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher); + assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue); + assertEquals("wrong capacity", Integer.MAX_VALUE, executor.getQueue().remainingCapacity()); + } + + /** + * test for executor-event-driven-dispatcher with unbounded-linked-blocking-queue with bounded capacity + */ + @Test + public void testDispatcherWithLinkedBlockingQueueWithBoundedCapacity() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-4"); + ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher); + assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue); + assertEquals("wrong capacity", 55, executor.getQueue().remainingCapacity()); + } + + /** + * test for executor-event-driven-dispatcher with unbounded-linked-blocking-queue with unbounded capacity + */ + @Test + public void testDispatcherWithLinkedBlockingQueueWithUnboundedCapacity() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-5"); + ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher); + assertTrue("wrong queue type", executor.getQueue() instanceof LinkedBlockingQueue); + assertEquals("wrong capacity", Integer.MAX_VALUE, executor.getQueue().remainingCapacity()); + } + + /** + * test for executor-event-driven-dispatcher with synchronous-queue + */ + @Test + public void testDispatcherWithSynchronousQueue() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("executor-event-driven-dispatcher-6"); + ThreadPoolExecutor executor = getThreadPoolExecutorAndAssert(dispatcher); + assertTrue("wrong queue type", executor.getQueue() instanceof SynchronousQueue); + } + + /** + * test for reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue + */ + @Test + public void testReactorBasedThreadPoolDispatcherWithSynchronousQueue() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("reactor-based-thread-pool-event-driven-dispatcher"); + assertTrue(dispatcher instanceof ReactorBasedThreadPoolEventDrivenDispatcher); + assertTrue(dispatcher instanceof ThreadPoolBuilder); + ThreadPoolBuilder pool = (ThreadPoolBuilder) dispatcher; + ThreadPoolExecutor executor = pool.se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder(); + assertNotNull(executor); + assertTrue("wrong queue type", executor.getQueue() instanceof SynchronousQueue); + } + + /** + * test for reactor-based-single-thread-event-driven-dispatcher with synchronous-queue + */ + @Test + public void testReactorBasedSingleThreadDispatcherWithSynchronousQueue() { + MessageDispatcher dispatcher = (MessageDispatcher) context.getBean("reactor-based-single-thread-event-driven-dispatcher"); + assertTrue(dispatcher instanceof ReactorBasedSingleThreadEventDrivenDispatcher); + } + + /** + * Assert that dispatcher is correct type and get executor. + */ + private ThreadPoolExecutor getThreadPoolExecutorAndAssert(MessageDispatcher dispatcher) { + assertTrue(dispatcher instanceof ExecutorBasedEventDrivenDispatcher); + assertTrue(dispatcher instanceof ThreadPoolBuilder); + ThreadPoolBuilder pool = (ThreadPoolBuilder) dispatcher; + ThreadPoolExecutor executor = pool.se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder(); + assertNotNull(executor); + return executor; + } + +} diff --git a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SpringConfigurationTest.java b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SpringConfigurationTest.java deleted file mode 100644 index cd5490d1d4..0000000000 --- a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SpringConfigurationTest.java +++ /dev/null @@ -1,120 +0,0 @@ -package se.scalablesolutions.akka.spring; - -import static org.junit.Assert.*; - -import org.junit.Before; -import org.junit.Test; - -import org.springframework.beans.factory.support.DefaultListableBeanFactory; -import org.springframework.beans.factory.xml.XmlBeanDefinitionReader; -import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.Resource; - -import se.scalablesolutions.akka.config.ActiveObjectConfigurator; -import se.scalablesolutions.akka.dispatch.FutureTimeoutException; -import se.scalablesolutions.akka.config.Config; -import se.scalablesolutions.akka.remote.RemoteNode; -import se.scalablesolutions.akka.spring.foo.Foo; -import se.scalablesolutions.akka.spring.foo.IBar; -import se.scalablesolutions.akka.spring.foo.MyPojo; -import se.scalablesolutions.akka.spring.foo.StatefulPojo; - -/** - * Tests for spring configuration of active objects and supervisor configuration. - */ -public class SpringConfigurationTest { - - private ApplicationContext context = null; - - @Before - public void setUp() { - context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/test-config.xml"); - } - - /** - * Tests that the <akka:active-object/> and <akka:supervision/> element - * can be used as a top level element. - */ - @Test - public void testParse() throws Exception { - final Resource CONTEXT = new ClassPathResource("se/scalablesolutions/akka/spring/foo/test-config.xml"); - DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory(); - XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory); - reader.loadBeanDefinitions(CONTEXT); - assertTrue(beanFactory.containsBeanDefinition("simple-active-object")); - assertTrue(beanFactory.containsBeanDefinition("remote-active-object")); - assertTrue(beanFactory.containsBeanDefinition("supervision1")); - } - - @Test - public void testSimpleActiveObject() { - MyPojo myPojo = (MyPojo) context.getBean("simple-active-object"); - String msg = myPojo.getFoo(); - msg += myPojo.getBar(); - assertEquals("wrong invocation order", "foobar", msg); - } - - @Test(expected=FutureTimeoutException.class) - public void testSimpleActiveObject_Timeout() { - MyPojo myPojo = (MyPojo) context.getBean("simple-active-object"); - myPojo.longRunning(); - } - - @Test - public void testSimpleActiveObject_NoTimeout() { - MyPojo myPojo = (MyPojo) context.getBean("simple-active-object-long-timeout"); - String msg = myPojo.longRunning(); - assertEquals("this took long", msg); - } - - @Test - public void testTransactionalActiveObject() { - MyPojo myPojo = (MyPojo) context.getBean("transactional-active-object"); - String msg = myPojo.getFoo(); - msg += myPojo.getBar(); - assertEquals("wrong invocation order", "foobar", msg); - } - - @Test - public void testRemoteActiveObject() { - new Thread(new Runnable() { - public void run() { - RemoteNode.start(); - } - }).start(); - try { Thread.currentThread().sleep(1000); } catch (Exception e) {} - Config.config(); - - MyPojo myPojo = (MyPojo) context.getBean("remote-active-object"); - assertEquals("foo", myPojo.getFoo()); - } - - @Test - public void testSupervision() { - // get ActiveObjectConfigurator bean from spring context - ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision1"); - // get ActiveObjects - Foo foo = myConfigurator.getInstance(Foo.class); - assertNotNull(foo); - IBar bar = myConfigurator.getInstance(IBar.class); - assertNotNull(bar); - MyPojo pojo = myConfigurator.getInstance(MyPojo.class); - assertNotNull(pojo); - } - - @Test - public void testTransactionalState() { - ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context.getBean("supervision2"); - StatefulPojo stateful = conf.getInstance(StatefulPojo.class); - stateful.init(); - stateful.setMapState("testTransactionalState", "some map state"); - stateful.setVectorState("some vector state"); - stateful.setRefState("some ref state"); - assertEquals("some map state", stateful.getMapState("testTransactionalState")); - assertEquals("some vector state", stateful.getVectorState()); - assertEquals("some ref state", stateful.getRefState()); - } - -} diff --git a/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SupervisorConfigurationTest.java b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SupervisorConfigurationTest.java new file mode 100644 index 0000000000..23af0749dd --- /dev/null +++ b/akka-spring/akka-spring-test-java/src/test/java/se/scalablesolutions/akka/spring/SupervisorConfigurationTest.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import se.scalablesolutions.akka.config.ActiveObjectConfigurator; +import se.scalablesolutions.akka.spring.foo.Foo; +import se.scalablesolutions.akka.spring.foo.IBar; +import se.scalablesolutions.akka.spring.foo.MyPojo; +import se.scalablesolutions.akka.spring.foo.StatefulPojo; + + +/** + * Testclass for supervisor configuration. + * @author michaelkober + * + */ +public class SupervisorConfigurationTest { + + private ApplicationContext context = null; + + @Before + public void setUp() { + context = new ClassPathXmlApplicationContext("se/scalablesolutions/akka/spring/foo/supervisor-config.xml"); + } + + @Test + public void testSupervision() { + // get ActiveObjectConfigurator bean from spring context + ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision1"); + // get ActiveObjects + Foo foo = myConfigurator.getInstance(Foo.class); + assertNotNull(foo); + IBar bar = myConfigurator.getInstance(IBar.class); + assertNotNull(bar); + MyPojo pojo = myConfigurator.getInstance(MyPojo.class); + assertNotNull(pojo); + } + + @Test + public void testTransactionalState() { + ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context.getBean("supervision2"); + StatefulPojo stateful = conf.getInstance(StatefulPojo.class); + stateful.init(); + stateful.setMapState("testTransactionalState", "some map state"); + stateful.setVectorState("some vector state"); + stateful.setRefState("some ref state"); + assertEquals("some map state", stateful.getMapState("testTransactionalState")); + assertEquals("some vector state", stateful.getVectorState()); + assertEquals("some ref state", stateful.getRefState()); + } + + @Test + public void testSupervisionWithDispatcher() { + ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context.getBean("supervision-with-dispatcher"); + // get ActiveObjects + Foo foo = myConfigurator.getInstance(Foo.class); + assertNotNull(foo); + // TODO how to check dispatcher? + } + +} diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd index 134e53e82f..b6b4d41f89 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka.xsd @@ -1,163 +1,226 @@ - - + targetNamespace="http://www.akkasource.org/schema/akka" + elementFormDefault="qualified" attributeFormDefault="unqualified" + xmlns:xsd="http://www.w3.org/2001/XMLSchema"> - + + + + + - - - - + + + + - - - - - - - - - - - - - - Name of the remote host. - - - - - - - Port of the remote host. - - - - - - - - - - - Pre restart callback method that is called during restart. - - - - - - - Post restart callback method that is called during restart. - - - - - - - - - - - - - - - - Name of the target class. - - - - - - - default timeout for '!!' invocations - - - - - - - Set to true if messages should have REQUIRES_NEW semantics - - - - - - - Interface implemented by target class. - - - - - - - Lifecycle, permanent or temporary - - - - - - - - - - - - - - - - - - - - - - - - - - - - Failover scheme, AllForOne or OneForOne - - - - - - - Maximal number of retries. - - - - - - - Timerange for restart. - - - - - - - + + + + + + - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the remote host. + + + + + + + Port of the remote host. + + + + + + + + + + + Pre restart callback method that is called during restart. + + + + + + + Post restart callback method that is called during restart. + + + + + + + + + + + + + + + + + + Name of the target class. + + + + + + + default timeout for '!!' invocations + + + + + + + Set to true if messages should have REQUIRES_NEW semantics + + + + + + + Interface implemented by target class. + + + + + + + Lifecycle, permanent or temporary + + + + + + + + + + + + + + + + + + + + + + + + + + + + Failover scheme, AllForOne or OneForOne + + + + + + + Maximal number of retries. + + + + + + + Timerange for restart. + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-spring/src/main/scala/ActiveObjectBeanDefinitionParser.scala b/akka-spring/src/main/scala/ActiveObjectBeanDefinitionParser.scala index e4b976188d..d22029e5ce 100644 --- a/akka-spring/src/main/scala/ActiveObjectBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/ActiveObjectBeanDefinitionParser.scala @@ -3,86 +3,27 @@ */ package se.scalablesolutions.akka.spring -import org.springframework.util.xml.DomUtils -import se.scalablesolutions.akka.util.Logging +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser +import org.springframework.beans.factory.xml.ParserContext import org.w3c.dom.Element + /** - * Parser for custom namespace configuration for active-object. + * Parser for custom namespace configuration. * @author michaelkober */ -trait ActiveObjectBeanDefinitionParser extends Logging { - import AkkaSpringConfigurationTags._ - - /** - * Parses the given element and returns a ActiveObjectProperties. - * @param element dom element to parse - * @return configuration for the active object +class ActiveObjectBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) */ - def parseActiveObject(element: Element): ActiveObjectProperties = { - val objectProperties = new ActiveObjectProperties() - val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); - val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG); - - if (remoteElement != null) { - objectProperties.host = mandatory(remoteElement, HOST) - objectProperties.port = mandatory(remoteElement, PORT).toInt - } - - if (callbacksElement != null) { - objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART) - objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART) - if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) { - throw new IllegalStateException("At least one of pre or post must be defined.") - } - } - - try { - objectProperties.timeout = mandatory(element, TIMEOUT).toLong - } catch { - case nfe: NumberFormatException => - log.error(nfe, "could not parse timeout %s", element.getAttribute(TIMEOUT)) - throw nfe - } - - objectProperties.target = mandatory(element, TARGET) - objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean - - if (!element.getAttribute(INTERFACE).isEmpty) { - objectProperties.interface = element.getAttribute(INTERFACE) - } - - if (!element.getAttribute(LIFECYCLE).isEmpty) { - objectProperties.lifecyclye = element.getAttribute(LIFECYCLE) - } - objectProperties + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val activeObjectConf = parseActiveObject(element) + activeObjectConf.setAsProperties(builder) } - /** - * Get a mandatory element attribute. - * @param element the element with the mandatory attribute - * @param attribute name of the mandatory attribute + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) */ - def mandatory(element: Element, attribute: String): String = { - if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { - throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) - } else { - element.getAttribute(attribute) - } - } - - /** - * Get a mandatory child element. - * @param element the parent element - * @param childName name of the mandatory child element - */ - def mandatoryElement(element: Element, childName: String): Element = { - val childElement = DomUtils.getChildElementByTagName(element, childName); - if (childElement == null) { - throw new IllegalArgumentException("Mandatory element missing: ''") - } else { - childElement - } - } - + override def getBeanClass(element: Element) = classOf[ActiveObjectFactoryBean] } \ No newline at end of file diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index fb83c6fe09..c8dea6dfb9 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -8,8 +8,7 @@ import org.springframework.beans.factory.config.AbstractFactoryBean import se.scalablesolutions.akka.actor.ActiveObject import reflect.BeanProperty import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks - - +import se.scalablesolutions.akka.dispatch.MessageDispatcher /** @@ -28,7 +27,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] { @BeanProperty var host: String = "" @BeanProperty var port: Int = _ @BeanProperty var lifecycle: String = "" - + @BeanProperty var dispatcher: DispatcherProperties = _ /* * @see org.springframework.beans.factory.FactoryBean#getObjectType() @@ -40,42 +39,68 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] { * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() */ def createInstance: AnyRef = { - ActiveObject.newInstance(target.toClass, timeout, transactional, restartCallbacks) - if (isRemote) { - newRemoteInstance(target, timeout, interface, transactional, restartCallbacks, host, port) + var argumentList = "" + if (isRemote) argumentList += "r" + if (hasInterface) argumentList += "i" + if (hasDispatcher) argumentList += "d" + create(argumentList) + } + +// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here) +// +// private[akka] def create(argList : String) : AnyRef = argList match { +// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks) +// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks) +// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) +// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) +// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks) +// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks) +// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks) +// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks) +// } + + private[akka] def create(argList : String) : AnyRef = { + if (argList == "r") { + ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks) + } else if (argList == "ri" ) { + ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks) + } else if (argList == "rd") { + ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) + } else if (argList == "rid") { + ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks) + } else if (argList == "i") { + ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks) + } else if (argList == "id") { + ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks) + } else if (argList == "d") { + ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks) } else { - newInstance(target, timeout, interface, transactional, restartCallbacks); + ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks) + } + } + + /** + * create Option[RestartCallback] + */ + private def callbacks: Option[RestartCallbacks] = { + if (hasCallbacks) { + val callbacks = new RestartCallbacks(pre, post) + Some(callbacks) + } else { + None } } private[akka] def isRemote = (host != null) && (!host.isEmpty) - /** - * create Option[RestartCallback] - */ - private def restartCallbacks: Option[RestartCallbacks] = { - if (((pre == null) || pre.isEmpty) && ((post == null) || post.isEmpty)) { - None - } else { - val callbacks = new RestartCallbacks(pre, post) - Some(callbacks) - } - } + private[akka] def hasInterface = (interface != null) && (!interface.isEmpty) - private def newInstance(target: String, timeout: Long, interface: String, transactional: Boolean, callbacks: Option[RestartCallbacks]): AnyRef = { - if ((interface == null) || interface.isEmpty) { - ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks) - } else { - ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks) - } - } + private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty) - private def newRemoteInstance(target: String, timeout: Long, interface: String, transactional: Boolean, callbacks: Option[RestartCallbacks], host: String, port: Int): AnyRef = { - if ((interface == null) || interface.isEmpty) { - ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks) - } else { - ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks) - } - } + private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty) + private[akka] def dispatcherInstance : MessageDispatcher = { + import DispatcherFactoryBean._ + createNewInstance(dispatcher) + } } \ No newline at end of file diff --git a/akka-spring/src/main/scala/ActiveObjectParser.scala b/akka-spring/src/main/scala/ActiveObjectParser.scala new file mode 100644 index 0000000000..9e64cce987 --- /dev/null +++ b/akka-spring/src/main/scala/ActiveObjectParser.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.util.xml.DomUtils +import org.w3c.dom.Element + +/** + * Parser trait for custom namespace configuration for active-object. + * @author michaelkober + */ +trait ActiveObjectParser extends BeanParser with DispatcherParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a ActiveObjectProperties. + * @param element dom element to parse + * @return configuration for the active object + */ + def parseActiveObject(element: Element): ActiveObjectProperties = { + val objectProperties = new ActiveObjectProperties() + val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); + val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG); + val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) + + if (remoteElement != null) { + objectProperties.host = mandatory(remoteElement, HOST) + objectProperties.port = mandatory(remoteElement, PORT).toInt + } + + if (callbacksElement != null) { + objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART) + objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART) + if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) { + throw new IllegalStateException("At least one of pre or post must be defined.") + } + } + + if (dispatcherElement != null) { + val dispatcherProperties = parseDispatcher(dispatcherElement) + objectProperties.dispatcher = dispatcherProperties + } + + try { + objectProperties.timeout = mandatory(element, TIMEOUT).toLong + } catch { + case nfe: NumberFormatException => + log.error(nfe, "could not parse timeout %s", element.getAttribute(TIMEOUT)) + throw nfe + } + + objectProperties.target = mandatory(element, TARGET) + objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean + + if (!element.getAttribute(INTERFACE).isEmpty) { + objectProperties.interface = element.getAttribute(INTERFACE) + } + + if (!element.getAttribute(LIFECYCLE).isEmpty) { + objectProperties.lifecyclye = element.getAttribute(LIFECYCLE) + } + objectProperties + } + +} \ No newline at end of file diff --git a/akka-spring/src/main/scala/ActiveObjectProperties.scala b/akka-spring/src/main/scala/ActiveObjectProperties.scala index bd1f838d9b..528b44885c 100644 --- a/akka-spring/src/main/scala/ActiveObjectProperties.scala +++ b/akka-spring/src/main/scala/ActiveObjectProperties.scala @@ -21,6 +21,8 @@ class ActiveObjectProperties { var host: String = "" var port: Int = _ var lifecyclye: String = "" + var dispatcher: DispatcherProperties = _ + /** * Sets the properties to the given builder. @@ -36,6 +38,7 @@ class ActiveObjectProperties { builder.addPropertyValue(INTERFACE, interface) builder.addPropertyValue(TRANSACTIONAL, transactional) builder.addPropertyValue(LIFECYCLE, lifecyclye) + builder.addPropertyValue(DISPATCHER_TAG, dispatcher) } } \ No newline at end of file diff --git a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala index ebf70a7ae3..a0e47cb638 100644 --- a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala +++ b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala @@ -12,7 +12,8 @@ import AkkaSpringConfigurationTags._ */ class AkkaNamespaceHandler extends NamespaceHandlerSupport { def init = { - registerBeanDefinitionParser(ACTIVE_OBJECT_TAG, new AkkaObjectBeanDefinitionParser()); + registerBeanDefinitionParser(ACTIVE_OBJECT_TAG, new ActiveObjectBeanDefinitionParser()); registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()); + registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()); } } \ No newline at end of file diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 058d654ea7..236cce03a5 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -8,17 +8,29 @@ package se.scalablesolutions.akka.spring * @author michaelkober */ object AkkaSpringConfigurationTags { + + // --- TAGS + // // top level tags val ACTIVE_OBJECT_TAG = "active-object" val SUPERVISION_TAG = "supervision" + val DISPATCHER_TAG = "dispatcher" + // active-object sub tags val RESTART_CALLBACKS_TAG = "restart-callbacks" - val REMOTE_TAG = "remote"; + val REMOTE_TAG = "remote" + // superivision sub tags val ACTIVE_OBJECTS_TAG = "active-objects" val STRATEGY_TAG = "restart-strategy" val TRAP_EXISTS_TAG = "trap-exits" val TRAP_EXIT_TAG = "trap-exit" + + // dispatcher sub tags + val THREAD_POOL_TAG = "thread-pool" + + // --- ATTRIBUTES + // // active object attributes val TIMEOUT = "timeout" val TARGET = "target" @@ -29,13 +41,53 @@ object AkkaSpringConfigurationTags { val PRE_RESTART = "pre" val POST_RESTART = "post" val LIFECYCLE = "lifecycle" + // supervision attributes val FAILOVER = "failover" val RETRIES = "retries" val TIME_RANGE = "timerange" - // Value types + + // dispatcher attributes + val NAME = "name" + val REF = "ref" + val TYPE = "type" + + // thread pool attributes + val QUEUE = "queue" + val CAPACITY = "capacity" + val FAIRNESS = "fairness" + val CORE_POOL_SIZE = "core-pool-size" + val MAX_POOL_SIZE = "max-pool-size" + val KEEP_ALIVE = "keep-alive" + val BOUND ="bound" + val REJECTION_POLICY ="rejection-policy" + + // --- VALUES + // + // Lifecycle val VAL_LIFECYCYLE_TEMPORARY = "temporary" val VAL_LIFECYCYLE_PERMANENT = "permanent" + + // Failover val VAL_ALL_FOR_ONE = "AllForOne" val VAL_ONE_FOR_ONE = "OneForOne" + + // rejection policies + val VAL_ABORT_POLICY = "abort-policy" + val VAL_CALLER_RUNS_POLICY = "caller-runs-policy" + val VAL_DISCARD_OLDEST_POLICY = "discard-oldest-policy" + val VAL_DISCARD_POLICY = "discard-policy" + + // dispatcher queue types + val VAL_BOUNDED_LINKED_BLOCKING_QUEUE = "bounded-linked-blocking-queue" + val VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE = "unbounded-linked-blocking-queue" + val VAL_SYNCHRONOUS_QUEUE = "synchronous-queue" + val VAL_BOUNDED_ARRAY_BLOCKING_QUEUE = "bounded-array-blocking-queue" + + // dispatcher types + val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven" + val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven" + val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven" + val THREAD_BASED = "thread-based" + } \ No newline at end of file diff --git a/akka-spring/src/main/scala/BeanParser.scala b/akka-spring/src/main/scala/BeanParser.scala new file mode 100644 index 0000000000..b79db3c1a8 --- /dev/null +++ b/akka-spring/src/main/scala/BeanParser.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import se.scalablesolutions.akka.util.Logging +import org.w3c.dom.Element +import org.springframework.util.xml.DomUtils + +/** + * Base trait with utility methods for bean parsing. + */ +trait BeanParser extends Logging { + + /** + * Get a mandatory element attribute. + * @param element the element with the mandatory attribute + * @param attribute name of the mandatory attribute + */ + def mandatory(element: Element, attribute: String): String = { + if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { + throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) + } else { + element.getAttribute(attribute) + } + } + + /** + * Get a mandatory child element. + * @param element the parent element + * @param childName name of the mandatory child element + */ + def mandatoryElement(element: Element, childName: String): Element = { + val childElement = DomUtils.getChildElementByTagName(element, childName); + if (childElement == null) { + throw new IllegalArgumentException("Mandatory element missing: ''") + } else { + childElement + } + } + +} \ No newline at end of file diff --git a/akka-spring/src/main/scala/AkkaObjectBeanDefinitionParser.scala b/akka-spring/src/main/scala/DispatcherBeanDefinitionParser.scala similarity index 61% rename from akka-spring/src/main/scala/AkkaObjectBeanDefinitionParser.scala rename to akka-spring/src/main/scala/DispatcherBeanDefinitionParser.scala index 1f90c17454..731414581d 100644 --- a/akka-spring/src/main/scala/AkkaObjectBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/DispatcherBeanDefinitionParser.scala @@ -3,28 +3,26 @@ */ package se.scalablesolutions.akka.spring -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext import org.w3c.dom.Element -import se.scalablesolutions.akka.util.Logging +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.beans.factory.xml.{ParserContext, AbstractSingleBeanDefinitionParser} /** * Parser for custom namespace configuration. * @author michaelkober */ -class AkkaObjectBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectBeanDefinitionParser { - /* +class DispatcherBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser with DispatcherParser { + /* * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) */ override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val activeObjectConf = parseActiveObject(element) - activeObjectConf.setAsProperties(builder) + val dispatcherProperties = parseDispatcher(element) + dispatcherProperties.setAsProperties(builder) } /* * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) */ - override def getBeanClass(element: Element) = classOf[ActiveObjectFactoryBean] + override def getBeanClass(element: Element) = classOf[DispatcherFactoryBean] } \ No newline at end of file diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala new file mode 100644 index 0000000000..c314c43f4a --- /dev/null +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.beans.factory.config.AbstractFactoryBean +import se.scalablesolutions.akka.config.JavaConfig._ +import AkkaSpringConfigurationTags._ +import reflect.BeanProperty +import se.scalablesolutions.akka.dispatch.{ThreadPoolBuilder, Dispatchers, MessageDispatcher} +import java.util.concurrent.RejectedExecutionHandler +import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy} + +/** + * Reusable factory method for dispatchers. + */ +object DispatcherFactoryBean { + def createNewInstance(properties: DispatcherProperties): MessageDispatcher = { + var dispatcher = properties.dispatcherType match { + case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name) + case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name) + case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name) + case THREAD_BASED => throw new IllegalArgumentException("not implemented yet") //FIXME + case _ => throw new IllegalArgumentException("unknown dispatcher type") + } + if ((properties.threadPool != null) && (properties.threadPool.queue != null)) { + var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder] + threadPoolBuilder = properties.threadPool.queue match { + case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness) + case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE => if (properties.threadPool.capacity > -1) + threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity) + else + threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound) + case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness) + case _ => throw new IllegalArgumentException("unknown queue type") + } + if (properties.threadPool.corePoolSize > -1) { + threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize) + } + if (properties.threadPool.maxPoolSize > -1) { + threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize) + } + if (properties.threadPool.keepAlive > -1) { + threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive) + } + if ((properties.threadPool.rejectionPolicy != null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { + val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { + case "abort-policy" => new AbortPolicy() + case "caller-runs-policy" => new CallerRunsPolicy() + case "discard-oldest-policy" => new DiscardOldestPolicy() + case "discard-policy" => new DiscardPolicy() + case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'") + } + threadPoolBuilder.setRejectionPolicy(policy) + } + threadPoolBuilder.asInstanceOf[MessageDispatcher] + } else { + dispatcher + } + } +} + +/** + * Factory bean for supervisor configuration. + * @author michaelkober + */ +class DispatcherFactoryBean extends AbstractFactoryBean[MessageDispatcher] { + @BeanProperty var properties: DispatcherProperties = _ + + /* + * @see org.springframework.beans.factory.FactoryBean#getObjectType() + */ + def getObjectType: Class[MessageDispatcher] = classOf[MessageDispatcher] + + /* + * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() + */ + def createInstance: MessageDispatcher = { + import DispatcherFactoryBean._ + createNewInstance(properties) + } +} \ No newline at end of file diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala new file mode 100644 index 0000000000..e8b47979d0 --- /dev/null +++ b/akka-spring/src/main/scala/DispatcherParser.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.w3c.dom.Element +import org.springframework.util.xml.DomUtils + +/** + * Parser trait for custom namespace for Akka dispatcher configuration. + * @author michaelkober + */ +trait DispatcherParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a DispatcherProperties. + * @param element dom element to parse + * @return configuration for the dispatcher + */ + def parseDispatcher(element: Element): DispatcherProperties = { + val properties = new DispatcherProperties() + var dispatcherElement = element + if (hasRef(element)) { + val ref = element.getAttribute(REF) + dispatcherElement = element.getOwnerDocument.getElementById(ref) + if (dispatcherElement == null) { + throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") + } + } + properties.name = mandatory(dispatcherElement, NAME) + properties.dispatcherType = mandatory(dispatcherElement, TYPE) + if (properties.dispatcherType == THREAD_BASED) { + if (dispatcherElement.getParentNode.getNodeName != "active-object") { + throw new IllegalArgumentException("Thread based dispatcher must be nested in active-object element!") + } + } + val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); + if (threadPoolElement != null) { + if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || + properties.dispatcherType == THREAD_BASED) { + throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") + } + val threadPoolProperties = parseThreadPool(threadPoolElement) + properties.threadPool = threadPoolProperties + } + properties +} + + /** + * Parses the given element and returns a ThreadPoolProperties. + * @param element dom element to parse + * @return configuration for the thread pool + */ + def parseThreadPool(element: Element): ThreadPoolProperties = { + val properties = new ThreadPoolProperties() + properties.queue = element.getAttribute(QUEUE) + if (element.hasAttribute(CAPACITY)) { + properties.capacity = element.getAttribute(CAPACITY).toInt + } + if (element.hasAttribute(BOUND)) { + properties.bound = element.getAttribute(BOUND).toInt + } + if (element.hasAttribute(FAIRNESS)) { + properties.fairness = element.getAttribute(FAIRNESS).toBoolean + } + if (element.hasAttribute(CORE_POOL_SIZE)) { + properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt + } + if (element.hasAttribute(MAX_POOL_SIZE)) { + properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt + } + if (element.hasAttribute(KEEP_ALIVE)) { + properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong + } + if (element.hasAttribute(REJECTION_POLICY)) { + properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) + } + properties + } + + def hasRef(element: Element): Boolean = { + val ref = element.getAttribute(REF) + (ref != null) && !ref.isEmpty + } + +} \ No newline at end of file diff --git a/akka-spring/src/main/scala/DispatcherProperties.scala b/akka-spring/src/main/scala/DispatcherProperties.scala new file mode 100644 index 0000000000..e35bb62d27 --- /dev/null +++ b/akka-spring/src/main/scala/DispatcherProperties.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.beans.factory.support.BeanDefinitionBuilder + +/** + * Data container for dispatcher configuration data. + * @author michaelkober + */ +class DispatcherProperties { + var ref: String = "" + var dispatcherType: String = "" + var name: String = "" + var threadPool: ThreadPoolProperties = _ + + /** + * Sets the properties to the given builder. + * @param builder bean definition builder + */ + def setAsProperties(builder: BeanDefinitionBuilder) { + builder.addPropertyValue("properties", this) + } + + override def toString : String = { + "DispatcherProperties[ref=" + ref + + ", dispatcher-type=" + dispatcherType + + ", name=" + name + + ", threadPool=" + threadPool + "]" + } +} + +/** + * Data container for thread pool configuration data. + * @author michaelkober + */ +class ThreadPoolProperties { + var queue = "" + var bound = -1 + var capacity = -1 + var fairness = false + var corePoolSize = -1 + var maxPoolSize = -1 + var keepAlive = -1L + var rejectionPolicy = "" + + override def toString : String = { + "ThreadPoolProperties[queue=" + queue + + ", bound=" + bound + + ", capacity=" + capacity + + ", fairness=" + fairness + + ", corePoolSize=" + corePoolSize + + ", maxPoolSize=" + maxPoolSize + + ", keepAlive=" + keepAlive + + ", policy=" + rejectionPolicy + "]" + } +} diff --git a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala index 7134675af1..172e38dbfc 100644 --- a/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala +++ b/akka-spring/src/main/scala/SupervisionBeanDefinitionParser.scala @@ -18,7 +18,7 @@ import org.springframework.util.xml.DomUtils * Parser for custom namespace for Akka declarative supervisor configuration. * @author michaelkober */ -class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectBeanDefinitionParser { +class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActiveObjectParser { /* (non-Javadoc) * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) */ diff --git a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala index 19fe23e5c9..2bfbaf0551 100644 --- a/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectBeanDefinitionParserTest.scala @@ -12,17 +12,22 @@ import ScalaDom._ import org.w3c.dom.Element /** - * Test for ActiveObjectBeanDefinitionParser + * Test for ActiveObjectParser * @author michaelkober */ @RunWith(classOf[JUnitRunner]) class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers { - private class Parser extends ActiveObjectBeanDefinitionParser + private class Parser extends ActiveObjectParser - describe("An ActiveObjectBeanDefinitionParser") { + describe("An ActiveObjectParser") { val parser = new Parser() it("should parse the active object configuration") { - val props = parser.parseActiveObject(createTestElement); + val xml = + + val props = parser.parseActiveObject(dom(xml).getDocumentElement); assert(props != null) assert(props.timeout == 1000) assert(props.target == "foo.bar.MyPojo") @@ -30,22 +35,32 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers { } it("should throw IllegalArgumentException on missing mandatory attributes") { - evaluating { parser.parseActiveObject(createTestElement2) } should produce [IllegalArgumentException] + val xml = + + evaluating { parser.parseActiveObject(dom(xml).getDocumentElement) } should produce [IllegalArgumentException] + } + + it("should parse ActiveObjects configuration with dispatcher") { + val xml = + + + val props = parser.parseActiveObject(dom(xml).getDocumentElement); + assert(props != null) + assert(props.dispatcher.dispatcherType == "thread-based") + } + + it("should parse remote ActiveObjects configuration") { + val xml = + + + val props = parser.parseActiveObject(dom(xml).getDocumentElement); + assert(props != null) + assert(props.host == "com.some.host") + assert(props.port == 9999) } } - - private def createTestElement : Element = { - val xml = - dom(xml).getDocumentElement - } - - private def createTestElement2 : Element = { - val xml = - dom(xml).getDocumentElement - } } \ No newline at end of file diff --git a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala index e0eec060bf..037d5a88ef 100644 --- a/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala +++ b/akka-spring/src/test/scala/ActiveObjectFactoryBeanTest.scala @@ -29,13 +29,21 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers { assert(bean.isRemote) } + it("should create object that implements the given interface") { + bean.setInterface("com.biz.IPojo"); + assert(bean.hasInterface) + } + + it("should create an active object with dispatcher if dispatcher is set") { + val props = new DispatcherProperties() + props.dispatcherType = "executor-based-event-driven" + bean.setDispatcher(props); + assert(bean.hasDispatcher) + } + it("should return the object type") { bean.setTarget("java.lang.String") assert(bean.getObjectType == classOf[String]) } - - it("should create an active object") { - // TODO: - } } } diff --git a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala new file mode 100644 index 0000000000..75a5cb6d7a --- /dev/null +++ b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import ScalaDom._ + +/** + * Test for DispatcherBeanDefinitionParser + * @author michaelkober + */ +@RunWith(classOf[JUnitRunner]) +class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { + + describe("A DispatcherBeanDefinitionParser") { + val parser = new DispatcherBeanDefinitionParser() + + it("should be able to parse the dispatcher configuration") { + val xml = + val props = parser.parseDispatcher(dom(xml).getDocumentElement); + assert(props != null) + assert(props.dispatcherType == "executor-based-event-driven") + assert(props.name == "myDispatcher") + } + + it("should be able to parse the thread pool configuration") { + val xml = + val props = parser.parseThreadPool(dom(xml).getDocumentElement); + assert(props != null) + assert(props.queue == "bounded-array-blocking-queue") + assert(props.capacity == 100) + assert(props.fairness) + assert(props.corePoolSize == 6) + assert(props.maxPoolSize == 40) + assert(props.keepAlive == 2000L) + assert(props.rejectionPolicy == "caller-runs-policy") + } + + it("should be able to parse the dispatcher with a thread pool configuration") { + val xml = + + + val props = parser.parseDispatcher(dom(xml).getDocumentElement); + assert(props != null) + assert(props.dispatcherType == "reactor-based-thread-pool-event-driven") + assert(props.name == "myDispatcher") + assert(props.threadPool.corePoolSize == 2) + assert(props.threadPool.maxPoolSize == 10) + assert(props.threadPool.keepAlive == 1000) + assert(props.threadPool.queue == "linked-blocking-queue") + } + + it("should throw IllegalArgumentException on not existing reference") { + val xml = + evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException] + } + + it("should throw IllegalArgumentException on missing mandatory attributes") { + val xml = + evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException] + } + + it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") { + val xml = + + + evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException] + } + + + it("should throw IllegalArgumentException when configuring a thread based dispatcher without ActiveObject") { + val xml = + evaluating { parser.parseDispatcher(dom(xml).getDocumentElement) } should produce [IllegalArgumentException] + } + } +} + + diff --git a/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala b/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala new file mode 100644 index 0000000000..3453fb5200 --- /dev/null +++ b/akka-spring/src/test/scala/DispatcherFactoryBeanTest.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.dispatch.MessageDispatcher + +@RunWith(classOf[JUnitRunner]) +class DispatcherFactoryBeanTest extends Spec with ShouldMatchers { + + describe("A DispatcherFactoryBean") { + val bean = new DispatcherFactoryBean + it("should have java getters and setters for the dispatcher properties") { + val props = new DispatcherProperties() + bean.setProperties(props) + assert(bean.getProperties == props) + } + + it("should return the object type MessageDispatcher") { + assert(bean.getObjectType == classOf[MessageDispatcher]) + } + } +}