From 42185955fa8c1aa3a74ca0cfd4eb2f2afa57a8be Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 25 May 2010 21:53:49 +0200 Subject: [PATCH 01/12] Initial attempt at fixing akka rest --- .../actor/BootableActorLoaderService.scala | 10 +- akka-core/src/main/scala/remote/Cluster.scala | 97 ++++++++----------- .../main/scala/ActorComponentProvider.scala | 29 ------ .../scala/ActorComponentProviderFactory.scala | 20 ---- akka-http/src/main/scala/AkkaServlet.scala | 19 ++-- config/akka-reference.conf | 1 + 6 files changed, 60 insertions(+), 116 deletions(-) delete mode 100644 akka-http/src/main/scala/ActorComponentProvider.scala delete mode 100644 akka-http/src/main/scala/ActorComponentProviderFactory.scala diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index de0d174fc7..4d98820c8c 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -11,6 +11,13 @@ import java.util.jar.JarFile import se.scalablesolutions.akka.util.{Bootable, Logging} import se.scalablesolutions.akka.config.Config._ +object ActorModules { + import java.util.concurrent.atomic.AtomicReference + private val _loader = new AtomicReference[Option[ClassLoader]](None) + def loader_? = _loader.get + private[actor] def loader_?(cl : Option[ClassLoader]) = _loader.set(cl) +} + /** * Handles all modules in the deploy directory (load and unload) */ @@ -46,15 +53,14 @@ trait BootableActorLoaderService extends Bootable with Logging { log.debug("Loading dependencies [%s]", dependencyJars) val allJars = toDeploy ::: dependencyJars - val parentClassLoader = classOf[Seq[_]].getClassLoader URLClassLoader.newInstance( allJars.toArray.asInstanceOf[Array[URL]], ClassLoader.getSystemClassLoader) - //parentClassLoader) } else getClass.getClassLoader) } abstract override def onLoad = { + ActorModules.loader_?(applicationLoader) for (loader <- applicationLoader; clazz <- BOOT_CLASSES) { log.info("Loading boot class [%s]", clazz) loader.loadClass(clazz).newInstance diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 948cb07a8b..66f7b59baa 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -23,25 +23,6 @@ trait Cluster { */ def name: String - /** - * Adds the specified hostname + port as a local node - * This information will be propagated to other nodes in the cluster - * and will be available at the other nodes through lookup and foreach - */ - def registerLocalNode(hostname: String, port: Int): Unit - - /** - * Removes the specified hostname + port from the local node - * This information will be propagated to other nodes in the cluster - * and will no longer be available at the other nodes through lookup and foreach - */ - def deregisterLocalNode(hostname: String, port: Int): Unit - - /** - * Sends the message to all Actors of the specified type on all other nodes in the cluster - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit - /** * Traverses all known remote addresses avaiable at all other nodes in the cluster * and applies the given PartialFunction on the first address that it's defined at @@ -65,8 +46,6 @@ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name", "default") @volatile protected var serializer : Serializer = _ - - private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -87,6 +66,7 @@ private[akka] object ClusterActor { private[akka] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class Node(endpoints: List[RemoteAddress]) + private[akka] case class InitClusterActor(serializer : Serializer) } /** @@ -168,6 +148,10 @@ abstract class BasicClusterActor extends ClusterActor with Logging { local = Node(local.endpoints.filterNot(_ == s)) broadcast(Papers(local.endpoints)) } + + case InitClusterActor(s) => { + serializer = s + } } /** @@ -206,24 +190,6 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Applies the given function to all remote addresses known */ def foreach(f: (RemoteAddress) => Unit): Unit = remotes.valuesIterator.toList.flatMap(_.endpoints).foreach(f) - - /** - * Registers a local endpoint - */ - def registerLocalNode(hostname: String, port: Int): Unit = - self ! RegisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Deregisters a local endpoint - */ - def deregisterLocalNode(hostname: String, port: Int): Unit = - self ! DeregisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Broadcasts the specified message to all Actors of type Class on all known Nodes - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = - self ! RelayedMessage(to.getName, msg) } /** @@ -232,28 +198,22 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { + //Import messages + import ClusterActor._ + lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName lazy val DEFAULT_CLUSTER_ACTOR_CLASS_NAME = classOf[JGroupsClusterActor].getName - @volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var clusterActorRef: Option[ActorRef] = None + @volatile private[akka] var classLoader : Option[ClassLoader] = Some(getClass.getClassLoader) - private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = { + private[remote] def createClusterActor(): Option[ActorRef] = { val name = config.getString("akka.remote.cluster.actor", DEFAULT_CLUSTER_ACTOR_CLASS_NAME) if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") - val serializer = Class.forName(config.getString( - "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - serializer.classLoader = Some(loader) - try { - Some(Actor.actorOf { - val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor] - a setSerializer serializer - a - }) + Some(Actor.actorOf(Class.forName(name).newInstance.asInstanceOf[ClusterActor])) } catch { case e => log.error(e, "Couldn't load Cluster provider: [%s]", name) @@ -267,15 +227,27 @@ object Cluster extends Cluster with Logging { RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil))) + private[this] def clusterActor = if(clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor]) + def name = clusterActor.map(_.name).getOrElse("No cluster") def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) - def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port)) + /**Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ + def registerLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! RegisterLocalNode(RemoteAddress(hostname, port))) - def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port)) + /**Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ + def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! DeregisterLocalNode(RemoteAddress(hostname, port))) - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) + /**Sends the message to all Actors of the specified type on all other nodes in the cluster + */ + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActorRef.foreach(_ ! RelayedMessage(to.getName, msg)) def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) @@ -283,14 +255,21 @@ object Cluster extends Cluster with Logging { def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (clusterActor.isEmpty) { + if (clusterActorRef.isEmpty) { for { - actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + actorRef <- createClusterActor() sup <- createSupervisor(actorRef) } { - clusterActorRef = Some(actorRef.start) - clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor]) + val serializer = Class.forName(config.getString( + "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] + + classLoader = serializerClassLoader orElse classLoader + serializer.classLoader = classLoader + actorRef.start sup.start + actorRef ! InitClusterActor(serializer) + clusterActorRef = Some(actorRef) } } } @@ -301,6 +280,6 @@ object Cluster extends Cluster with Logging { c <- clusterActorRef s <- c.supervisor } s.stop - clusterActor = None + classLoader = Some(getClass.getClassLoader) } } diff --git a/akka-http/src/main/scala/ActorComponentProvider.scala b/akka-http/src/main/scala/ActorComponentProvider.scala deleted file mode 100644 index 52512b6929..0000000000 --- a/akka-http/src/main/scala/ActorComponentProvider.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ComponentScope -import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.Actor - -class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator]) - extends IoCFullyManagedComponentProvider with Logging { - - override def getScope = ComponentScope.Singleton - - override def getInstance: AnyRef = { - val instances = for { - conf <- configurators - if conf.isDefined(clazz) - instance <- conf.getInstance(clazz) - } yield instance - if (instances.isEmpty) throw new IllegalArgumentException( - "No Actor or Active Object for class [" + clazz + "] could be found.\nMake sure you have defined and configured the class as an Active Object or Actor in a supervisor hierarchy.") - else instances.head.asInstanceOf[AnyRef] - } -} diff --git a/akka-http/src/main/scala/ActorComponentProviderFactory.scala b/akka-http/src/main/scala/ActorComponentProviderFactory.scala deleted file mode 100644 index e1ea94347f..0000000000 --- a/akka-http/src/main/scala/ActorComponentProviderFactory.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory} -import com.sun.jersey.core.spi.component.{ComponentContext} - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging - -class ActorComponentProviderFactory(val configurators: List[Configurator]) -extends IoCComponentProviderFactory with Logging { - override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz) - - override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = { - configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null) - } -} diff --git a/akka-http/src/main/scala/AkkaServlet.scala b/akka-http/src/main/scala/AkkaServlet.scala index aecdaa9671..6a53e5629f 100644 --- a/akka-http/src/main/scala/AkkaServlet.scala +++ b/akka-http/src/main/scala/AkkaServlet.scala @@ -4,9 +4,8 @@ package se.scalablesolutions.akka.rest -import se.scalablesolutions.akka.config.ConfiguratorRepository import se.scalablesolutions.akka.config.Config.config - +import se.scalablesolutions.akka.actor.ActorModules import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication @@ -21,13 +20,21 @@ class AkkaServlet extends ServletContainer { import scala.collection.JavaConversions._ override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { - val configurators = ConfiguratorRepository.getConfigurators - - resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces)) + resourceConfig.getProperties.put( + "com.sun.jersey.config.property.packages", + config.getList("akka.rest.resource_packages").mkString(";") + ) resourceConfig.getProperties.put( "com.sun.jersey.spi.container.ResourceFilters", config.getList("akka.rest.filters").mkString(",")) - webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) + val cl = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(ActorModules.loader_?.getOrElse(cl)) + try { + webApplication.initiate(resourceConfig) + } + finally{ + Thread.currentThread.setContextClassLoader(cl) + } } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index b61bdd773f..84fee17176 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -50,6 +50,7 @@ hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + resource_packages = ["sample.rest.scala","ample.rest.java"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor From f6c1cbf3680e21dd933a629cbc42302fb36b1433 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 26 May 2010 23:00:31 +0200 Subject: [PATCH 02/12] Elaborated on classloader handling --- .../actor/BootableActorLoaderService.scala | 13 ++++------- .../src/main/scala/AkkaCometServlet.scala | 20 ++++++++-------- akka-http/src/main/scala/AkkaServlet.scala | 10 +------- .../src/main/scala/EmbeddedAppServer.scala | 23 +++++++++++++++---- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index 4d98820c8c..1f8b76bfc9 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -11,13 +11,6 @@ import java.util.jar.JarFile import se.scalablesolutions.akka.util.{Bootable, Logging} import se.scalablesolutions.akka.config.Config._ -object ActorModules { - import java.util.concurrent.atomic.AtomicReference - private val _loader = new AtomicReference[Option[ClassLoader]](None) - def loader_? = _loader.get - private[actor] def loader_?(cl : Option[ClassLoader]) = _loader.set(cl) -} - /** * Handles all modules in the deploy directory (load and unload) */ @@ -60,7 +53,6 @@ trait BootableActorLoaderService extends Bootable with Logging { } abstract override def onLoad = { - ActorModules.loader_?(applicationLoader) for (loader <- applicationLoader; clazz <- BOOT_CLASSES) { log.info("Loading boot class [%s]", clazz) loader.loadClass(clazz).newInstance @@ -68,5 +60,8 @@ trait BootableActorLoaderService extends Bootable with Logging { super.onLoad } - abstract override def onUnload = ActorRegistry.shutdownAll + abstract override def onUnload = { + super.onUnload + ActorRegistry.shutdownAll + } } diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index 0ce6f8312a..73590fe9f6 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -15,15 +15,7 @@ import org.atmosphere.container.GrizzlyCometSupport import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} -/** - * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a - * standard servlet container, e.g. not using the Akka Kernel. - *

- * Used by the Akka Kernel to bootstrap REST and Comet. - */ -class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { - val servlet = new RestServlet with AtmosphereServletProcessor { - +class AtmosphereRestServlet extends RestServlet with AtmosphereServletProcessor { //Delegate to implement the behavior for AtmosphereHandler private val handler = new AbstractReflectorAtmosphereHandler { override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { @@ -44,6 +36,16 @@ class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { } } +/** + * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a + * standard servlet container, e.g. not using the Akka Kernel. + *

+ * Used by the Akka Kernel to bootstrap REST and Comet. + */ +class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { + lazy val servlet = createRestServlet + + protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet /** * We override this to avoid Atmosphere looking for it's atmosphere.xml file * Instead we specify what semantics we want in code. diff --git a/akka-http/src/main/scala/AkkaServlet.scala b/akka-http/src/main/scala/AkkaServlet.scala index 6a53e5629f..b9dd9a652d 100644 --- a/akka-http/src/main/scala/AkkaServlet.scala +++ b/akka-http/src/main/scala/AkkaServlet.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.rest import se.scalablesolutions.akka.config.Config.config -import se.scalablesolutions.akka.actor.ActorModules import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication @@ -28,13 +27,6 @@ class AkkaServlet extends ServletContainer { "com.sun.jersey.spi.container.ResourceFilters", config.getList("akka.rest.filters").mkString(",")) - val cl = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(ActorModules.loader_?.getOrElse(cl)) - try { - webApplication.initiate(resourceConfig) - } - finally{ - Thread.currentThread.setContextClassLoader(cl) - } + webApplication.initiate(resourceConfig) } } diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala index a49d6276d1..f7d5325626 100644 --- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala @@ -9,10 +9,12 @@ import com.sun.grizzly.http.servlet.ServletAdapter import com.sun.grizzly.standalone.StaticStreamAlgorithm import javax.ws.rs.core.UriBuilder +import com.sun.jersey.api.core.ResourceConfig +import com.sun.jersey.spi.container.WebApplication import se.scalablesolutions.akka.actor.BootableActorLoaderService import se.scalablesolutions.akka.util.{Bootable, Logging} -import se.scalablesolutions.akka.comet.AkkaServlet +import se.scalablesolutions.akka.comet.{ AkkaServlet, AtmosphereRestServlet } /** * Handles the Akka Comet Support (load/unload) @@ -42,7 +44,19 @@ trait EmbeddedAppServer extends Bootable with Logging { val adapter = new ServletAdapter adapter.setHandleStaticResources(true) - adapter.setServletInstance(new AkkaServlet) + adapter.setServletInstance(new AkkaServlet { + override def createRestServlet = new AtmosphereRestServlet { + override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { + val cl = Thread.currentThread.getContextClassLoader + try { + Thread.currentThread.setContextClassLoader(applicationLoader.get) + super.initiate(resourceConfig,webApplication) + } finally { + Thread.currentThread.setContextClassLoader(cl) + } + } + } + }) adapter.setContextPath(uri.getPath) adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root") @@ -65,9 +79,10 @@ trait EmbeddedAppServer extends Bootable with Logging { abstract override def onUnload = { super.onUnload - if (jerseySelectorThread.isDefined) { + jerseySelectorThread foreach { (t) => { log.info("Shutting down REST service (Jersey)") - jerseySelectorThread.get.stopEndpoint + t.stopEndpoint + } } } } From 8d5e685ad61ec74bf15560d2b9b6592b8a71f3e6 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 26 May 2010 23:39:50 +0200 Subject: [PATCH 03/12] Tweaking akka-reference.conf --- config/akka-reference.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 84fee17176..18a2701b8c 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@ filename = "./logs/akka.log" roll = "daily" # Options: never, hourly, daily, sunday/monday/... - level = "debug" # Options: fatal, critical, error, warning, info, debug, trace + level = "trace" # Options: fatal, critical, error, warning, info, debug, trace console = on # syslog_host = "" # syslog_server_name = "" @@ -50,7 +50,7 @@ hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use - resource_packages = ["sample.rest.scala","ample.rest.java"] # List with all resource packages for your Jersey services + resource_packages = ["sample.rest.scala","sample.rest.java"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor From f4298eb5c01b11c0cae064975e28375e1c2b5886 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Fri, 28 May 2010 20:24:00 +0200 Subject: [PATCH 04/12] re #247: Removed all vals for repositories except for embeddedRepo. Introduced module configurations necessary for akka-core; other modules still missing. --- project/build/AkkaProject.scala | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 1f7bd27a94..2cb0f5f265 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -40,20 +40,29 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val dist = zipTask(allArtifacts, "dist", distName) dependsOn (`package`) describedAs("Zips up the distribution.") - // ------------------------------------------------------------ - // repositories - val embeddedrepo = "embedded repo" at (info.projectPath / "embedded-repo").asURL.toString + // ------------------------------------------------------------------------------------------------------------------- + // Repositories + // Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases) + // must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. + // Therefore, if repositories are defined this must happen as def, not as val. + // ------------------------------------------------------------------------------------------------------------------- + val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here! + val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) + def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" + val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo) + def jbossRepo = "JBoss Repo" at "https://repository.jboss.org/nexus/content/groups/public/" + val jbossModuleConfig = ModuleConfiguration("org.jboss", jbossRepo) + val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", jbossRepo) + val jgroupsModuleConfig = ModuleConfiguration("jgroups", jbossRepo) + /* val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" val databinder = "DataBinder" at "http://databinder.net/repo" // val configgy = "Configgy" at "http://www.lag.net/repo" val codehaus = "Codehaus" at "http://repository.codehaus.org" val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org" - val jboss = "jBoss" at "https://repository.jboss.org/nexus/content/groups/public/" - val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository" val java_net = "java.net" at "http://download.java.net/maven/2" - val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots" - val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases" + */ // ------------------------------------------------------------ // project defintions From b5f4cfae2545ac06e09608498b41c70e7d16868b Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Fri, 28 May 2010 20:52:09 +0200 Subject: [PATCH 05/12] re #247: Added module configuration for akka-persistence-cassandra. Attention: Necessary to delete .ivy2 directory! --- project/build/AkkaProject.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 2cb0f5f265..cb0d2ced2a 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -54,8 +54,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val jbossModuleConfig = ModuleConfiguration("org.jboss", jbossRepo) val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", jbossRepo) val jgroupsModuleConfig = ModuleConfiguration("jgroups", jbossRepo) + def sunjdmkRepo = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" + val jmsModuleConfig = ModuleConfiguration("javax.jms", sunjdmkRepo) + val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", sunjdmkRepo) + val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", sunjdmkRepo) /* - val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" val databinder = "DataBinder" at "http://databinder.net/repo" // val configgy = "Configgy" at "http://www.lag.net/repo" val codehaus = "Codehaus" at "http://repository.codehaus.org" From b94f584fcf2640029e4b7b9d90439c60c9d9de74 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 28 May 2010 22:05:17 +0200 Subject: [PATCH 06/12] ClassLoader issue --- .../src/main/scala/actor/BootableActorLoaderService.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index de0d174fc7..0f49a0b6cb 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -49,9 +49,9 @@ trait BootableActorLoaderService extends Bootable with Logging { val parentClassLoader = classOf[Seq[_]].getClassLoader URLClassLoader.newInstance( allJars.toArray.asInstanceOf[Array[URL]], - ClassLoader.getSystemClassLoader) + Thread.currentThread.getContextClassLoader) //parentClassLoader) - } else getClass.getClassLoader) + } else Thread.currentThread.getContextClassLoader) } abstract override def onLoad = { From d587d1a5fe3d0e9975e8cc3d388190cdf6117210 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 28 May 2010 22:05:44 +0200 Subject: [PATCH 07/12] Fixing sbt run (exclude slf4j 1.5.11) --- project/build/AkkaProject.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 1f7bd27a94..ec55352ed6 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -106,6 +106,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) ) + //Exclude slf4j1.5.11 from the classpath, it's conflicting... + override def runClasspath = super.runClasspath --- (super.runClasspath ** "slf4j*1.5.11.jar") + // ------------------------------------------------------------ // publishing override def managedStyle = ManagedStyle.Maven From 2a060c9fa37227b5584905897a79c33929c0a9f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 29 May 2010 00:01:41 +0200 Subject: [PATCH 08/12] Ported samples rest scala to the new akka-http --- .../src/main/scala/SimpleService.scala | 101 ++++++++++-------- 1 file changed, 59 insertions(+), 42 deletions(-) diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index 6e67772f7e..e8f3576e9f 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -11,12 +11,12 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter - +import scala.xml.NodeSeq import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} - +import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} @@ -27,16 +27,13 @@ class Boot { SupervisorConfig( RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( - actorOf[SimpleService], + actorOf[SimpleServiceActor], LifeCycle(Permanent)) :: Supervise( - actorOf[Chat], + actorOf[ChatActor], LifeCycle(Permanent)) :: Supervise( - actorOf[PersistentSimpleService], - LifeCycle(Permanent)) :: - Supervise( - actorOf[PubSub], + actorOf[PersistentSimpleServiceActor], LifeCycle(Permanent)) :: Nil)) factory.newInstance.start @@ -50,19 +47,25 @@ class Boot { * Or browse to the URL from a web browser. */ @Path("/scalacount") -class SimpleService extends Transactor { - - case object Tick +class SimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type SimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} +class SimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = TransactionalState.newMap[String, Integer] - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) self.reply(Tick:{counter + 1}) @@ -75,9 +78,7 @@ class SimpleService extends Transactor { } @Path("/pubsub/") -class PubSub extends Actor { - case class Msg(topic: String, message: String) - +class PubSub { @GET @Suspend @Produces(Array("text/plain;charset=ISO-8859-1")) @@ -90,8 +91,6 @@ class PubSub extends Actor { @Produces(Array("text/plain;charset=ISO-8859-1")) //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo") def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) - - def receive = { case _ => } } /** @@ -102,19 +101,26 @@ class PubSub extends Actor { * Or browse to the URL from a web browser. */ @Path("/persistentscalacount") -class PersistentSimpleService extends Transactor { +class PersistentSimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} - case object Tick +class PersistentSimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = CassandraStorage.newMap - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val bytes = storage.get(KEY.getBytes).get val counter = ByteBuffer.wrap(bytes).getInt storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) @@ -128,16 +134,37 @@ class PersistentSimpleService extends Transactor { } @Path("/chat") -class Chat extends Actor with Logging { - case class Chat(val who: String, val what: String, val msg: String) - +class Chat { + import ChatActor.ChatMsg @Suspend @GET @Produces(Array("text/html")) def suspend = () + @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") + @Consumes(Array("application/x-www-form-urlencoded")) + @Produces(Array("text/html")) + def publishMessage(form: MultivaluedMap[String, String]) = { + val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) + //Fetch the first actor of type ChatActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[ChatActor]).headOption + r <- a.!![String](msg)} yield r + //Return either the resulting String or a default one + result getOrElse "System__error" + } +} + +object ChatActor { + case class ChatMsg(val who: String, val what: String, val msg: String) +} + +class ChatActor extends Actor with Logging { + import ChatActor.ChatMsg def receive = { - case Chat(who, what, msg) => { + case ChatMsg(who, what, msg) => { what match { case "login" => self.reply("System Message__" + who + " has joined.") case "post" => self.reply("" + who + "__" + msg) @@ -146,16 +173,6 @@ class Chat extends Actor with Logging { } case x => log.info("recieve unknown: " + x) } - - @POST - @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) - //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") - @Consumes(Array("application/x-www-form-urlencoded")) - @Produces(Array("text/html")) - def publishMessage(form: MultivaluedMap[String, String]) = - (self !! Chat(form.getFirst("name"), - form.getFirst("action"), - form.getFirst("message"))).getOrElse("System__error") } From d1d8773f045b5892037c623bb09b5f78035cd646 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Sat, 29 May 2010 07:28:42 +0200 Subject: [PATCH 09/12] closes #247: Added all missing module configurations. --- project/build/AkkaProject.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 749557360f..d6537dc0b0 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -44,7 +44,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { // Repositories // Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases) // must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. - // Therefore, if repositories are defined this must happen as def, not as val. + // Therefore, if repositories are defined, this must happen as def, not as val. // ------------------------------------------------------------------------------------------------------------------- val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here! val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) @@ -54,17 +54,23 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val jbossModuleConfig = ModuleConfiguration("org.jboss", jbossRepo) val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", jbossRepo) val jgroupsModuleConfig = ModuleConfiguration("jgroups", jbossRepo) - def sunjdmkRepo = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" + def sunjdmkRepo = "Sun JDMK Repo" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" val jmsModuleConfig = ModuleConfiguration("javax.jms", sunjdmkRepo) val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", sunjdmkRepo) val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", sunjdmkRepo) - /* + def javaNetRepo = "java.net Repo" at "http://download.java.net/maven/2" + val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", javaNetRepo) + val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", javaNetRepo) + val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo) + val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", javaNetRepo) + val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) + + /* These are not needed and can possibly be deleted. val databinder = "DataBinder" at "http://databinder.net/repo" // val configgy = "Configgy" at "http://www.lag.net/repo" val codehaus = "Codehaus" at "http://repository.codehaus.org" val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org" val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository" - val java_net = "java.net" at "http://download.java.net/maven/2" */ // ------------------------------------------------------------ From b8fd1d1c31fc0c9a19bfd9a39f7c4e8735c9d9e4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 29 May 2010 15:24:06 +0200 Subject: [PATCH 10/12] Ported akka-sample-secure --- .../src/main/scala/SimpleService.scala | 30 +++++++++++-------- config/akka-reference.conf | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 680aabd718..4fd5a8a63d 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} import se.scalablesolutions.akka.stm.TransactionalState +import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor class Boot { val factory = SupervisorFactory( @@ -90,12 +91,7 @@ import javax.annotation.security.{RolesAllowed, DenyAll, PermitAll} import javax.ws.rs.{GET, Path, Produces} @Path("/secureticker") -class SecureTickActor extends Transactor with Logging { - - case object Tick - private val KEY = "COUNTER" - private var hasStartedTicking = false - private lazy val storage = TransactionalState.newMap[String, Integer] +class SecureTickService { /** * allow access for any user to "/secureticker/public" @@ -123,15 +119,25 @@ class SecureTickActor extends Transactor with Logging { @DenyAll def paranoiaTick = tick - def tick = (self !! Tick) match { - case (Some(counter)) => (Tick: - {counter} - ) - case _ => (Error in counter) + def tick = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption + r <- a.!![Integer]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result match { + case (Some(counter)) => (Tick: {counter}) + case _ => (Error in counter) + } } +} +class SecureTickActor extends Transactor with Logging { + private val KEY = "COUNTER" + private var hasStartedTicking = false + private lazy val storage = TransactionalState.newMap[String, Integer] def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val counter = storage.get(KEY).get.intValue storage.put(KEY, counter + 1) self.reply(new Integer(counter + 1)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 447b4b30c9..2d8b8dc43e 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -54,7 +54,7 @@ hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use - resource_packages = ["sample.rest.scala","sample.rest.java"] # List with all resource packages for your Jersey services + resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor From 2dd2118179093037f80f81ebc024d01f80b11c44 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 29 May 2010 15:40:52 +0200 Subject: [PATCH 11/12] Prepare for master merge --- .../scalablesolutions/akka/api/RestTest.java | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java deleted file mode 100644 index d6d4ae9060..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import com.sun.grizzly.http.SelectorThread; -import com.sun.grizzly.http.servlet.ServletAdapter; -import com.sun.grizzly.tcp.Adapter; -import com.sun.grizzly.standalone.StaticStreamAlgorithm; - -import javax.ws.rs.core.UriBuilder; -import javax.servlet.Servlet; - -import junit.framework.TestCase; -import org.junit.*; - -import java.io.IOException; -import java.net.URI; - -import se.scalablesolutions.akka.config.*; -import static se.scalablesolutions.akka.config.JavaConfig.*; - - -public class RestTest extends TestCase { - - private static int PORT = 9998; - private static URI URI = UriBuilder.fromUri("http://localhost/").port(PORT).build(); - private static SelectorThread selector = null; - private static ActiveObjectConfigurator conf = new ActiveObjectConfigurator(); - - @BeforeClass - protected void setUp() { - conf.configure( - new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}), - new Component[] { - new Component( - JerseyFoo.class, - new LifeCycle(new Permanent()), - 10000000) - }).inject().supervise(); - selector = startJersey(); - } - - public void testSimpleRequest() { - assertTrue(true); - } - -/* - - @Test - public void testSimpleRequest() throws IOException, InstantiationException { - selector.listen(); - Client client = Client.create(); - WebResource webResource = client.resource(URI); - String responseMsg = webResource.path("/foo").get(String.class); - assertEquals("hello foo", responseMsg); - selector.stopEndpoint(); - } -*/ - private static SelectorThread startJersey() { - try { - Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet(); - ServletAdapter adapter = new ServletAdapter(); - adapter.setServletInstance(servlet); - adapter.setContextPath(URI.getPath()); - return createGrizzlySelector(adapter, URI, PORT); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static SelectorThread createGrizzlySelector(Adapter adapter, URI uri, int port) throws IOException, InstantiationException { - final String scheme = uri.getScheme(); - if (!scheme.equalsIgnoreCase("http")) - throw new IllegalArgumentException("The URI scheme, of the URI " + uri + ", must be equal (ignoring case) to 'http'"); - final SelectorThread selectorThread = new SelectorThread(); - selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName()); - selectorThread.setPort(port); - selectorThread.setAdapter(adapter); - return selectorThread; - } -} - From 2d2bdda33007a02de1c4a44164846f94a812c3ab Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Sun, 30 May 2010 08:22:23 +0200 Subject: [PATCH 12/12] Upgrade to Camel 2.3.0 --- .../src/test/scala/CamelContextLifecycleTest.scala | 4 +++- .../akka-sample-camel/src/main/scala/Boot.scala | 12 +++++------- project/build/AkkaProject.scala | 9 ++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/akka-camel/src/test/scala/CamelContextLifecycleTest.scala b/akka-camel/src/test/scala/CamelContextLifecycleTest.scala index b8caea0805..cf558ec8d9 100644 --- a/akka-camel/src/test/scala/CamelContextLifecycleTest.scala +++ b/akka-camel/src/test/scala/CamelContextLifecycleTest.scala @@ -13,7 +13,9 @@ class CamelContextLifecycleTest extends JUnitSuite with CamelContextLifecycle { init(ctx) assert(context.isStreamCaching === true) assert(!context.asInstanceOf[TestCamelContext].isStarted) - assert(!template.asInstanceOf[DefaultProducerTemplate].isStarted) + // In Camel 2.3 CamelComtext.createProducerTemplate starts + // the template before returning it (wasn't started in 2.2) + assert(template.asInstanceOf[DefaultProducerTemplate].isStarted) start assert(context.asInstanceOf[TestCamelContext].isStarted) assert(template.asInstanceOf[DefaultProducerTemplate].isStarted) diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 810f31aba5..5b08e15a1a 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -45,21 +45,19 @@ class Boot { // Publish subscribe example // - // Cometd example is disabled because of unresolved sbt/ivy dependency resolution issues. - // If you want to run this example, make sure to replace all jetty-*-6.1.22.jar files - // on the classpath with corresponding jetty-*-6.1.11.jar files. + // Cometd example commented out because camel-cometd is broken in Camel 2.3 // - //val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target" - //val cometdSubscriber = new Subscriber("cometd-subscriber", cometdUri).start - //val cometdPublisher = new Publisher("cometd-publisher", cometdUri).start + //val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target" + //val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start + //val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start val jmsUri = "jms:topic:test" val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start - //val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start + //val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 0d782c3459..daf43c2ca4 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -234,7 +234,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val camel_core = "org.apache.camel" % "camel-core" % "2.2.0" % "compile" + val camel_core = "org.apache.camel" % "camel-core" % "2.3.0" % "compile" } class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { @@ -337,11 +337,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) { - val commons_codec = "commons-codec" % "commons-codec" % "1.3" % "compile" val spring_jms = "org.springframework" % "spring-jms" % "3.0.1.RELEASE" % "compile" - val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.2.0" % "compile" - val camel_jms = "org.apache.camel" % "camel-jms" % "2.2.0" % "compile" - val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.0" % "compile" + val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.3.0" % "compile" + val camel_jms = "org.apache.camel" % "camel-jms" % "2.3.0" % "compile" + val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile" } class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {