Merge branch 'master' of git@github.com:jboner/akka

Conflicts:
	config/akka-reference.conf
This commit is contained in:
Jonas Bonér 2010-05-30 12:52:33 +02:00
commit ce064cc4c1
14 changed files with 208 additions and 321 deletions

View file

@ -13,7 +13,9 @@ class CamelContextLifecycleTest extends JUnitSuite with CamelContextLifecycle {
init(ctx)
assert(context.isStreamCaching === true)
assert(!context.asInstanceOf[TestCamelContext].isStarted)
assert(!template.asInstanceOf[DefaultProducerTemplate].isStarted)
// In Camel 2.3 CamelComtext.createProducerTemplate starts
// the template before returning it (wasn't started in 2.2)
assert(template.asInstanceOf[DefaultProducerTemplate].isStarted)
start
assert(context.asInstanceOf[TestCamelContext].isStarted)
assert(template.asInstanceOf[DefaultProducerTemplate].isStarted)

View file

@ -46,12 +46,11 @@ trait BootableActorLoaderService extends Bootable with Logging {
log.debug("Loading dependencies [%s]", dependencyJars)
val allJars = toDeploy ::: dependencyJars
val parentClassLoader = classOf[Seq[_]].getClassLoader
URLClassLoader.newInstance(
allJars.toArray.asInstanceOf[Array[URL]],
ClassLoader.getSystemClassLoader)
Thread.currentThread.getContextClassLoader)
//parentClassLoader)
} else getClass.getClassLoader)
} else Thread.currentThread.getContextClassLoader)
}
abstract override def onLoad = {
@ -62,5 +61,8 @@ trait BootableActorLoaderService extends Bootable with Logging {
super.onLoad
}
abstract override def onUnload = ActorRegistry.shutdownAll
abstract override def onUnload = {
super.onUnload
ActorRegistry.shutdownAll
}
}

View file

@ -23,25 +23,6 @@ trait Cluster {
*/
def name: String
/**
* Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit
/**
* Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit
/**
* Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
/**
* Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at
@ -65,8 +46,6 @@ trait ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name", "default")
@volatile protected var serializer : Serializer = _
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
}
/**
@ -87,6 +66,7 @@ private[akka] object ClusterActor {
private[akka] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[akka] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[akka] case class Node(endpoints: List[RemoteAddress])
private[akka] case class InitClusterActor(serializer : Serializer)
}
/**
@ -168,6 +148,10 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
local = Node(local.endpoints.filterNot(_ == s))
broadcast(Papers(local.endpoints))
}
case InitClusterActor(s) => {
serializer = s
}
}
/**
@ -206,24 +190,6 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Applies the given function to all remote addresses known
*/
def foreach(f: (RemoteAddress) => Unit): Unit = remotes.valuesIterator.toList.flatMap(_.endpoints).foreach(f)
/**
* Registers a local endpoint
*/
def registerLocalNode(hostname: String, port: Int): Unit =
self ! RegisterLocalNode(RemoteAddress(hostname, port))
/**
* Deregisters a local endpoint
*/
def deregisterLocalNode(hostname: String, port: Int): Unit =
self ! DeregisterLocalNode(RemoteAddress(hostname, port))
/**
* Broadcasts the specified message to all Actors of type Class on all known Nodes
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
self ! RelayedMessage(to.getName, msg)
}
/**
@ -232,28 +198,22 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Loads a specified ClusterActor and delegates to that instance.
*/
object Cluster extends Cluster with Logging {
//Import messages
import ClusterActor._
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
lazy val DEFAULT_CLUSTER_ACTOR_CLASS_NAME = classOf[JGroupsClusterActor].getName
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
@volatile private[remote] var clusterActorRef: Option[ActorRef] = None
@volatile private[akka] var classLoader : Option[ClassLoader] = Some(getClass.getClassLoader)
private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = {
private[remote] def createClusterActor(): Option[ActorRef] = {
val name = config.getString("akka.remote.cluster.actor", DEFAULT_CLUSTER_ACTOR_CLASS_NAME)
if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
serializer.classLoader = Some(loader)
try {
Some(Actor.actorOf {
val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor]
a setSerializer serializer
a
})
Some(Actor.actorOf(Class.forName(name).newInstance.asInstanceOf[ClusterActor]))
} catch {
case e =>
log.error(e, "Couldn't load Cluster provider: [%s]", name)
@ -267,15 +227,27 @@ object Cluster extends Cluster with Logging {
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil)))
private[this] def clusterActor = if(clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor])
def name = clusterActor.map(_.name).getOrElse("No cluster")
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf))
def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port))
/**Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! RegisterLocalNode(RemoteAddress(hostname, port)))
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port))
/**Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! DeregisterLocalNode(RemoteAddress(hostname, port)))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
/**Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActorRef.foreach(_ ! RelayedMessage(to.getName, msg))
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
@ -283,14 +255,21 @@ object Cluster extends Cluster with Logging {
def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized {
log.info("Starting up Cluster Service...")
if (clusterActor.isEmpty) {
if (clusterActorRef.isEmpty) {
for {
actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
actorRef <- createClusterActor()
sup <- createSupervisor(actorRef)
} {
clusterActorRef = Some(actorRef.start)
clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor])
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
actorRef.start
sup.start
actorRef ! InitClusterActor(serializer)
clusterActorRef = Some(actorRef)
}
}
}
@ -301,6 +280,6 @@ object Cluster extends Cluster with Logging {
c <- clusterActorRef
s <- c.supervisor
} s.stop
clusterActor = None
classLoader = Some(getClass.getClassLoader)
}
}

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import com.sun.grizzly.http.SelectorThread;
import com.sun.grizzly.http.servlet.ServletAdapter;
import com.sun.grizzly.tcp.Adapter;
import com.sun.grizzly.standalone.StaticStreamAlgorithm;
import javax.ws.rs.core.UriBuilder;
import javax.servlet.Servlet;
import junit.framework.TestCase;
import org.junit.*;
import java.io.IOException;
import java.net.URI;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
public class RestTest extends TestCase {
private static int PORT = 9998;
private static URI URI = UriBuilder.fromUri("http://localhost/").port(PORT).build();
private static SelectorThread selector = null;
private static ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
@BeforeClass
protected void setUp() {
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
JerseyFoo.class,
new LifeCycle(new Permanent()),
10000000)
}).inject().supervise();
selector = startJersey();
}
public void testSimpleRequest() {
assertTrue(true);
}
/*
@Test
public void testSimpleRequest() throws IOException, InstantiationException {
selector.listen();
Client client = Client.create();
WebResource webResource = client.resource(URI);
String responseMsg = webResource.path("/foo").get(String.class);
assertEquals("hello foo", responseMsg);
selector.stopEndpoint();
}
*/
private static SelectorThread startJersey() {
try {
Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet();
ServletAdapter adapter = new ServletAdapter();
adapter.setServletInstance(servlet);
adapter.setContextPath(URI.getPath());
return createGrizzlySelector(adapter, URI, PORT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static SelectorThread createGrizzlySelector(Adapter adapter, URI uri, int port) throws IOException, InstantiationException {
final String scheme = uri.getScheme();
if (!scheme.equalsIgnoreCase("http"))
throw new IllegalArgumentException("The URI scheme, of the URI " + uri + ", must be equal (ignoring case) to 'http'");
final SelectorThread selectorThread = new SelectorThread();
selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName());
selectorThread.setPort(port);
selectorThread.setAdapter(adapter);
return selectorThread;
}
}

View file

@ -1,29 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.rest
import com.sun.jersey.core.spi.component.ComponentScope
import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
import se.scalablesolutions.akka.config.Configurator
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.Actor
class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator])
extends IoCFullyManagedComponentProvider with Logging {
override def getScope = ComponentScope.Singleton
override def getInstance: AnyRef = {
val instances = for {
conf <- configurators
if conf.isDefined(clazz)
instance <- conf.getInstance(clazz)
} yield instance
if (instances.isEmpty) throw new IllegalArgumentException(
"No Actor or Active Object for class [" + clazz + "] could be found.\nMake sure you have defined and configured the class as an Active Object or Actor in a supervisor hierarchy.")
else instances.head.asInstanceOf[AnyRef]
}
}

View file

@ -1,20 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.rest
import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory}
import com.sun.jersey.core.spi.component.{ComponentContext}
import se.scalablesolutions.akka.config.Configurator
import se.scalablesolutions.akka.util.Logging
class ActorComponentProviderFactory(val configurators: List[Configurator])
extends IoCComponentProviderFactory with Logging {
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
}
}

View file

@ -5,25 +5,17 @@
package se.scalablesolutions.akka.comet
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.rest.{AkkaServlet => RestServlet}
import java.util.{List => JList}
import javax.servlet.ServletConfig
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.sun.jersey.spi.container.servlet.ServletContainer
import org.atmosphere.container.GrizzlyCometSupport
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
/**
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
* standard servlet container, e.g. not using the Akka Kernel.
* <p/>
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging {
val servlet = new RestServlet with AtmosphereServletProcessor {
class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor {
//Delegate to implement the behavior for AtmosphereHandler
private val handler = new AbstractReflectorAtmosphereHandler {
override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
@ -44,6 +36,18 @@ class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging {
}
}
/**
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
* standard servlet container, e.g. not using the Akka Kernel.
* <p/>
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging {
lazy val servlet = createRestServlet
protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
}
/**
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
* Instead we specify what semantics we want in code.

View file

@ -1,33 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.rest
import se.scalablesolutions.akka.config.ConfiguratorRepository
import se.scalablesolutions.akka.config.Config.config
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
/**
* Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
* e.g. not using the Akka Kernel.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AkkaServlet extends ServletContainer {
import scala.collection.JavaConversions._
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
val configurators = ConfiguratorRepository.getConfigurators
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces))
resourceConfig.getProperties.put(
"com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(","))
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
}
}

View file

@ -5,14 +5,15 @@
package se.scalablesolutions.akka.kernel
import com.sun.grizzly.http.SelectorThread
import com.sun.grizzly.http.servlet.ServletAdapter
import com.sun.grizzly.http.servlet.{ ServletAdapter }
import com.sun.grizzly.standalone.StaticStreamAlgorithm
import javax.ws.rs.core.UriBuilder
import javax.servlet.ServletConfig
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.comet.AkkaServlet
import se.scalablesolutions.akka.comet.{ AkkaServlet }
/**
* Handles the Akka Comet Support (load/unload)
@ -42,9 +43,31 @@ trait EmbeddedAppServer extends Bootable with Logging {
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)
adapter.setServletInstance(new AkkaServlet)
adapter.setServletInstance(new AkkaServlet {
override def init(sc : ServletConfig) : Unit = {
val cl = Thread.currentThread.getContextClassLoader
try {
Thread.currentThread.setContextClassLoader(applicationLoader.get)
super.init(sc)
}
finally {
Thread.currentThread.setContextClassLoader(cl)
}
}
})
adapter.setContextPath(uri.getPath)
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("cometSupport",
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
adapter.addInitParameter("com.sun.jersey.config.property.packages",
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)
@ -65,9 +88,10 @@ trait EmbeddedAppServer extends Bootable with Logging {
abstract override def onUnload = {
super.onUnload
if (jerseySelectorThread.isDefined) {
jerseySelectorThread foreach { (t) => {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
t.stopEndpoint
}
}
}
}

View file

@ -45,21 +45,19 @@ class Boot {
// Publish subscribe example
//
// Cometd example is disabled because of unresolved sbt/ivy dependency resolution issues.
// If you want to run this example, make sure to replace all jetty-*-6.1.22.jar files
// on the classpath with corresponding jetty-*-6.1.11.jar files.
// Cometd example commented out because camel-cometd is broken in Camel 2.3
//
//val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target"
//val cometdSubscriber = new Subscriber("cometd-subscriber", cometdUri).start
//val cometdPublisher = new Publisher("cometd-publisher", cometdUri).start
//val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target"
//val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start
//val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start
val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start
val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start
//val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start
//val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start
val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start
actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor

View file

@ -11,12 +11,12 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter
import scala.xml.NodeSeq
import java.lang.Integer
import java.nio.ByteBuffer
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
import org.atmosphere.util.XSSHtmlFilter
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
@ -27,16 +27,13 @@ class Boot {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])),
Supervise(
actorOf[SimpleService],
actorOf[SimpleServiceActor],
LifeCycle(Permanent)) ::
Supervise(
actorOf[Chat],
actorOf[ChatActor],
LifeCycle(Permanent)) ::
Supervise(
actorOf[PersistentSimpleService],
LifeCycle(Permanent)) ::
Supervise(
actorOf[PubSub],
actorOf[PersistentSimpleServiceActor],
LifeCycle(Permanent))
:: Nil))
factory.newInstance.start
@ -50,19 +47,25 @@ class Boot {
* Or browse to the URL from a web browser.
*/
@Path("/scalacount")
class SimpleService extends Transactor {
case object Tick
class SimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
class SimpleServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
@GET
@Produces(Array("text/html"))
def count = (self !! Tick).getOrElse(<error>Error in counter</error>)
def receive = {
case Tick => if (hasStartedTicking) {
case "Tick" => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
self.reply(<success>Tick:{counter + 1}</success>)
@ -75,9 +78,7 @@ class SimpleService extends Transactor {
}
@Path("/pubsub/")
class PubSub extends Actor {
case class Msg(topic: String, message: String)
class PubSub {
@GET
@Suspend
@Produces(Array("text/plain;charset=ISO-8859-1"))
@ -90,8 +91,6 @@ class PubSub extends Actor {
@Produces(Array("text/plain;charset=ISO-8859-1"))
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo")
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
def receive = { case _ => }
}
/**
@ -102,19 +101,26 @@ class PubSub extends Actor {
* Or browse to the URL from a web browser.
*/
@Path("/persistentscalacount")
class PersistentSimpleService extends Transactor {
class PersistentSimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
case object Tick
class PersistentSimpleServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = CassandraStorage.newMap
@GET
@Produces(Array("text/html"))
def count = (self !! Tick).getOrElse(<error>Error in counter</error>)
def receive = {
case Tick => if (hasStartedTicking) {
case "Tick" => if (hasStartedTicking) {
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
@ -128,16 +134,37 @@ class PersistentSimpleService extends Transactor {
}
@Path("/chat")
class Chat extends Actor with Logging {
case class Chat(val who: String, val what: String, val msg: String)
class Chat {
import ChatActor.ChatMsg
@Suspend
@GET
@Produces(Array("text/html"))
def suspend = ()
@POST
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar")
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) = {
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
//Fetch the first actor of type ChatActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
r <- a.!![String](msg)} yield r
//Return either the resulting String or a default one
result getOrElse "System__error"
}
}
object ChatActor {
case class ChatMsg(val who: String, val what: String, val msg: String)
}
class ChatActor extends Actor with Logging {
import ChatActor.ChatMsg
def receive = {
case Chat(who, what, msg) => {
case ChatMsg(who, what, msg) => {
what match {
case "login" => self.reply("System Message__" + who + " has joined.")
case "post" => self.reply("" + who + "__" + msg)
@ -146,16 +173,6 @@ class Chat extends Actor with Logging {
}
case x => log.info("recieve unknown: " + x)
}
@POST
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar")
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) =
(self !! Chat(form.getFirst("name"),
form.getFirst("action"),
form.getFirst("message"))).getOrElse("System__error")
}

View file

@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
import se.scalablesolutions.akka.stm.TransactionalState
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
class Boot {
val factory = SupervisorFactory(
@ -90,12 +91,7 @@ import javax.annotation.security.{RolesAllowed, DenyAll, PermitAll}
import javax.ws.rs.{GET, Path, Produces}
@Path("/secureticker")
class SecureTickActor extends Transactor with Logging {
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
class SecureTickService {
/**
* allow access for any user to "/secureticker/public"
@ -123,15 +119,25 @@ class SecureTickActor extends Transactor with Logging {
@DenyAll
def paranoiaTick = tick
def tick = (self !! Tick) match {
case (Some(counter)) => (<success>Tick:
{counter}
</success>)
case _ => (<error>Error in counter</error>)
def tick = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
r <- a.!![Integer]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result match {
case (Some(counter)) => (<success>Tick: {counter}</success>)
case _ => (<error>Error in counter</error>)
}
}
}
class SecureTickActor extends Transactor with Logging {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
def receive = {
case Tick => if (hasStartedTicking) {
case "Tick" => if (hasStartedTicking) {
val counter = storage.get(KEY).get.intValue
storage.put(KEY, counter + 1)
self.reply(new Integer(counter + 1))

View file

@ -54,6 +54,7 @@
hostname = "localhost"
port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor

View file

@ -40,20 +40,38 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val dist = zipTask(allArtifacts, "dist", distName) dependsOn (`package`) describedAs("Zips up the distribution.")
// ------------------------------------------------------------
// repositories
val embeddedrepo = "embedded repo" at (info.projectPath / "embedded-repo").asURL.toString
val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
// -------------------------------------------------------------------------------------------------------------------
// Repositories
// Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases)
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
// Therefore, if repositories are defined, this must happen as def, not as val.
// -------------------------------------------------------------------------------------------------------------------
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here!
val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo)
def jbossRepo = "JBoss Repo" at "https://repository.jboss.org/nexus/content/groups/public/"
val jbossModuleConfig = ModuleConfiguration("org.jboss", jbossRepo)
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", jbossRepo)
val jgroupsModuleConfig = ModuleConfiguration("jgroups", jbossRepo)
def sunjdmkRepo = "Sun JDMK Repo" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
val jmsModuleConfig = ModuleConfiguration("javax.jms", sunjdmkRepo)
val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", sunjdmkRepo)
val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", sunjdmkRepo)
def javaNetRepo = "java.net Repo" at "http://download.java.net/maven/2"
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", javaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", javaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", javaNetRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
/* These are not needed and can possibly be deleted.
val databinder = "DataBinder" at "http://databinder.net/repo"
// val configgy = "Configgy" at "http://www.lag.net/repo"
val codehaus = "Codehaus" at "http://repository.codehaus.org"
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val jboss = "jBoss" at "https://repository.jboss.org/nexus/content/groups/public/"
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository"
val java_net = "java.net" at "http://download.java.net/maven/2"
val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots"
val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases"
*/
// ------------------------------------------------------------
// project defintions
@ -106,6 +124,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version)
)
//Exclude slf4j1.5.11 from the classpath, it's conflicting...
override def runClasspath = super.runClasspath --- (super.runClasspath ** "slf4j*1.5.11.jar")
// ------------------------------------------------------------
// publishing
override def managedStyle = ManagedStyle.Maven
@ -213,7 +234,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val camel_core = "org.apache.camel" % "camel-core" % "2.2.0" % "compile"
val camel_core = "org.apache.camel" % "camel-core" % "2.3.0" % "compile"
}
class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
@ -316,11 +337,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {
val commons_codec = "commons-codec" % "commons-codec" % "1.3" % "compile"
val spring_jms = "org.springframework" % "spring-jms" % "3.0.1.RELEASE" % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.2.0" % "compile"
val camel_jms = "org.apache.camel" % "camel-jms" % "2.2.0" % "compile"
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.0" % "compile"
val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.3.0" % "compile"
val camel_jms = "org.apache.camel" % "camel-jms" % "2.3.0" % "compile"
val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile"
}
class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {