Akka LIVES!

This commit is contained in:
Viktor Klang 2010-02-22 00:45:11 +01:00
parent a16c6da3b7
commit c28a283d91
18 changed files with 89 additions and 53 deletions

View file

@ -13,7 +13,6 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.{HashCode, Logging}
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.util.concurrent.ConcurrentHashMap
import java.util.{Timer, TimerTask}
@ -78,6 +77,8 @@ object AMQP {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AMQPSupervisor extends Actor {
import scala.collection.JavaConversions._
private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor]
faultHandler = Some(OneForOneStrategy(5, 5000))
@ -137,7 +138,7 @@ object AMQP {
}
override def shutdown = {
connections.values.asScala.foreach(_ ! Stop)
asMap(connections).values.foreach(_ ! Stop)
exit
}
@ -360,9 +361,14 @@ object AMQP {
extends FaultTolerantConnectionActor {
consumer: Consumer =>
import scala.collection.JavaConversions._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
//FIXME use better strategy to convert scala.immutable.Map to java.util.Map
private val jConfigMap = configurationArguments.foldLeft(new java.util.HashMap[String,Object]){ (m,kv) => { m.put(kv._1,kv._2); m } }
private val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener]
setupChannel
@ -410,7 +416,7 @@ object AMQP {
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, autoDelete, configurationArguments)
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, autoDelete, jConfigMap)
listeners.elements.toList.map(_._2).foreach(registerListener)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
@ -425,7 +431,7 @@ object AMQP {
listener.queueName,
passive, durable,
listener.exclusive, listener.autoDelete,
configurationArguments)
jConfigMap)
}
log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName)

View file

@ -41,11 +41,11 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<!--dependency>
<artifactId>akka-persistence-redis</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
</dependency-->
<dependency>
<artifactId>akka-comet</artifactId>
<groupId>${project.groupId}</groupId>

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.actor
import config.ScalaConfig._
import se.scalablesolutions.akka.config.ScalaConfig._
import org.scalatest.Suite
import patterns.Patterns

View file

@ -90,7 +90,7 @@ trait CassandraSession extends Closeable with Flushable {
def ++|(key: String, batch: Map[String, List[ColumnOrSuperColumn]], consistencyLevel: Int): Unit = {
val jmap = new java.util.HashMap[String, JList[ColumnOrSuperColumn]]
for (entry <- batch; (key, value) = entry) jmap.put(key, value)
for (entry <- batch; (key, value) = entry) jmap.put(key, new java.util.ArrayList(value))
client.batch_insert(keyspace, key, jmap, consistencyLevel)
}

View file

@ -95,9 +95,20 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
removedEntries.clear
}
def -=(key: K) = remove(key)
def -=(key: K) = {
remove(key)
this
}
def +=(key: K, value: V) = put(key, value)
override def +=(kv : (K,V)) = {
put(kv._1,kv._2)
this
}
def +=(key: K, value: V) = {
put(key, value)
this
}
override def put(key: K, value: V): Option[V] = {
register
@ -109,9 +120,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
newAndUpdatedEntries.update(key, value)
}
def remove(key: K) = {
override def remove(key: K) = {
register
removedEntries.add(key)
newAndUpdatedEntries.get(key)
}
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
@ -144,6 +156,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch { case e: Exception => None }
}
def iterator = elements
override def elements: Iterator[Tuple2[K, V]] = {
new Iterator[Tuple2[K, V]] {
private val originalList: List[Tuple2[K, V]] = try {
@ -386,14 +400,20 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
override def isEmpty: Boolean =
size == 0
override def +=(elem: A): Unit = enqueue(elem)
override def ++=(elems: Iterator[A]): Unit = enqueue(elems.toList: _*)
override def ++=(elems: Iterable[A]): Unit = this ++= elems.elements
override def +=(elem: A) = {
enqueue(elem)
this
}
override def ++=(elems: Iterator[A]) = {
enqueue(elems.toList: _*)
this
}
def ++=(elems: Iterable[A]): Unit = this ++= elems.elements
override def dequeueFirst(p: A => Boolean): Option[A] =
throw new UnsupportedOperationException("dequeueFirst not supported")
override def dequeueAll(p: A => Boolean): Seq[A] =
override def dequeueAll(p: A => Boolean): scala.collection.mutable.Seq[A] =
throw new UnsupportedOperationException("dequeueAll not supported")
private def register = {

View file

@ -9,6 +9,8 @@ import se.scalablesolutions.akka.Config.config
import sjson.json.Serializer._
import java.util.NoSuchElementException
import com.mongodb._
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
@ -123,7 +125,7 @@ private[akka] object MongoStorageBackend extends
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
@ -141,7 +143,7 @@ private[akka] object MongoStorageBackend extends
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
@ -179,7 +181,7 @@ private[akka] object MongoStorageBackend extends
}
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
throw new NoSuchElementException(e.getMessage)
}
}
@ -219,7 +221,7 @@ private[akka] object MongoStorageBackend extends
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
@ -228,7 +230,7 @@ private[akka] object MongoStorageBackend extends
o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
throw new NoSuchElementException(e.getMessage)
}
}
@ -238,7 +240,7 @@ private[akka] object MongoStorageBackend extends
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
@ -252,7 +254,7 @@ private[akka] object MongoStorageBackend extends
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
throw new NoSuchElementException(e.getMessage)
}
}

View file

@ -6,6 +6,7 @@ import org.junit.{Test, Before}
import org.junit.Assert._
import _root_.dispatch.json._
import _root_.dispatch.json.Js._
import java.util.NoSuchElementException
@scala.reflect.BeanInfo case class Foo(no: Int, name: String)
class MongoStorageSpec extends TestCase {
@ -111,12 +112,12 @@ class MongoStorageSpec extends TestCase {
try {
MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
} catch {case e: NoSuchElementException => {}}
try {
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
} catch {case e: NoSuchElementException => {}}
}
@Test
@ -198,7 +199,7 @@ class MongoStorageSpec extends TestCase {
try {
MongoStorageBackend.getMapStorageFor("U-M2")
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
} catch {case e: NoSuchElementException => {}}
changeSetM.clear
}

View file

@ -107,7 +107,7 @@ private [akka] object RedisStorageBackend extends
def removeMapStorageFor(name: String): Unit = {
db.keys("%s:*".format(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(keys) =>
keys.foreach(db.delete(_))
}
@ -120,7 +120,7 @@ private [akka] object RedisStorageBackend extends
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] =
db.get(makeRedisKey(name, key)) match {
case None =>
throw new Predef.NoSuchElementException(new String(key) + " not present")
throw new java.util.NoSuchElementException(new String(key) + " not present")
case Some(s) => Some(s.getBytes)
}
@ -135,7 +135,7 @@ private [akka] object RedisStorageBackend extends
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(keys) =>
keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList
}
@ -203,7 +203,7 @@ private [akka] object RedisStorageBackend extends
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
db.listIndex(new String(encode(name.getBytes)), index) match {
case None =>
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
throw new java.util.NoSuchElementException(name + " does not have element at " + index)
case Some(e) => e.getBytes
}
}
@ -223,7 +223,7 @@ private [akka] object RedisStorageBackend extends
else count
db.listRange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
case None =>
throw new Predef.NoSuchElementException(name + " does not have elements in the range specified")
throw new java.util.NoSuchElementException(name + " does not have elements in the range specified")
case Some(l) =>
l map (_.getBytes)
}
@ -232,7 +232,7 @@ private [akka] object RedisStorageBackend extends
def getVectorStorageSizeFor(name: String): Int = {
db.listLength(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
@ -244,7 +244,7 @@ private [akka] object RedisStorageBackend extends
def getRefStorageFor(name: String): Option[Array[Byte]] = {
db.get(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(s) => Some(s.getBytes)
}
}
@ -258,7 +258,7 @@ private [akka] object RedisStorageBackend extends
def dequeue(name: String): Option[Array[Byte]] = {
db.popHead(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(s) =>
Some(s.getBytes)
}
@ -268,7 +268,7 @@ private [akka] object RedisStorageBackend extends
def size(name: String): Int = {
db.listLength(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
@ -279,14 +279,14 @@ private [akka] object RedisStorageBackend extends
case 1 =>
db.listIndex(new String(encode(name.getBytes)), start) match {
case None =>
throw new Predef.NoSuchElementException("No element at " + start)
throw new java.util.NoSuchElementException("No element at " + start)
case Some(s) =>
List(s.getBytes)
}
case n =>
db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match {
case None =>
throw new Predef.NoSuchElementException(
throw new java.util.NoSuchElementException(
"No element found between " + start + " and " + (start + count - 1))
case Some(es) =>
es.map(_.getBytes)
@ -312,7 +312,7 @@ private [akka] object RedisStorageBackend extends
def zcard(name: String): Int = {
db.zCard(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
@ -320,7 +320,7 @@ private [akka] object RedisStorageBackend extends
def zscore(name: String, item: Array[Byte]): String = {
db.zScore(new String(encode(name.getBytes)), new String(item)) match {
case None =>
throw new Predef.NoSuchElementException(new String(item) + " not present")
throw new java.util.NoSuchElementException(new String(item) + " not present")
case Some(s) => s
}
}
@ -328,7 +328,7 @@ private [akka] object RedisStorageBackend extends
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
throw new java.util.NoSuchElementException(name + " not present")
case Some(s) =>
s.map(_.getBytes)
}

View file

@ -15,7 +15,7 @@
<modules>
<module>akka-persistence-common</module>
<module>akka-persistence-redis</module>
<!--module>akka-persistence-redis</module-->
<module>akka-persistence-mongo</module>
<module>akka-persistence-cassandra</module>
</modules>

View file

@ -7,8 +7,8 @@ package se.scalablesolutions.akka.rest
import com.sun.jersey.core.spi.component.ComponentScope
import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
import config.Configurator
import util.Logging
import se.scalablesolutions.akka.config.Configurator
import se.scalablesolutions.akka.util.Logging
class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator])
extends IoCFullyManagedComponentProvider with Logging {

View file

@ -27,7 +27,7 @@ class AkkaServlet extends ServletContainer {
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces))
resourceConfig.getProperties.put(
"com.sun.jersey.spi.container.ResourceFilters",
Config.config.getList("akka.rest.filters").mkString(","))
se.scalablesolutions.akka.Config.config.getList("akka.rest.filters").mkString(","))
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
}

View file

@ -5,6 +5,7 @@ import _root_.net.liftweb.http._
import _root_.net.liftweb.sitemap._
import _root_.net.liftweb.sitemap.Loc._
import _root_.net.liftweb.http.auth._
import _root_.net.liftweb.common._
import Helpers._
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
@ -23,7 +24,7 @@ class Boot {
LiftRules.addToPackages("sample.lift")
LiftRules.httpAuthProtectedResource.prepend {
case (ParsePath("liftcount" :: Nil, _, _, _)) => Full(AuthRole("admin"))
case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin"))
}
LiftRules.authentication = HttpBasicAuthentication("lift") {

View file

@ -1,4 +1,4 @@
import _root_.bootstrap.liftweb.Boot
/*import _root_.bootstrap.liftweb.Boot
import _root_.scala.tools.nsc.MainGenericRunner
object LiftConsole {
@ -13,3 +13,4 @@ object LiftConsole {
exit(0)
}
}
*/

View file

@ -86,7 +86,7 @@ class PubSub extends Actor {
@Broadcast
@Path("/topic/{topic}/{message}/")
@Produces(Array("text/plain;charset=ISO-8859-1"))
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" }
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo")
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
def receive = { case _ => }
@ -147,7 +147,7 @@ class Chat extends Actor {
@POST
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" }
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar")
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) =
@ -157,7 +157,7 @@ class Chat extends Actor {
}
class JsonpFilter extends BroadcastFilter[String] with Logging {
class JsonpFilter extends BroadcastFilter with Logging {
def filter(an: AnyRef) = {
val m = an.toString
var name = m

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.security.samples
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.security.{DigestAuthenticationActor, UserInfo}
import se.scalablesolutions.akka.security._
import se.scalablesolutions.akka.state.TransactionalState
class Boot {

View file

@ -14,7 +14,7 @@
</parent>
<modules>
<module>akka-sample-chat</module>
<!--module>akka-sample-chat</module-->
<module>akka-sample-lift</module>
<module>akka-sample-security</module>
<module>akka-sample-rest-scala</module>

View file

@ -27,17 +27,22 @@
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.1.3-ea</version>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.3-rc1</version>
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-util</artifactId>
<version>1.1-M6</version>
<version>${lift.version}</version>
</dependency>
<!-- For Testing -->

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.security
import config.ScalaConfig._
import se.scalablesolutions.akka.config.ScalaConfig._
import org.scalatest.Suite
import org.scalatest.junit.JUnitSuite