added configuration system based on Configgy, now JMX enabled + fixed a couple of bugs
This commit is contained in:
parent
c1b6740e49
commit
83ada023d3
45 changed files with 2100 additions and 1253 deletions
|
|
@ -5,8 +5,7 @@
|
|||
package se.scalablesolutions.akka
|
||||
|
||||
import java.io.File
|
||||
import java.lang.reflect.Method
|
||||
import java.net.{URL, URLClassLoader}
|
||||
import java.net.URLClassLoader
|
||||
import kernel.util.Logging
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,127 +4,160 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel
|
||||
|
||||
//import org.apache.zookeeper.jmx.ManagedUtil
|
||||
//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
|
||||
//import org.apache.zookeeper.server.ServerConfig
|
||||
//import org.apache.zookeeper.server.NIOServerCnxn
|
||||
|
||||
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
|
||||
//import voldemort.server.{VoldemortConfig, VoldemortServer}
|
||||
//import voldemort.versioning.Versioned
|
||||
|
||||
import com.sun.grizzly.http.SelectorThread
|
||||
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
|
||||
|
||||
import java.io.IOException
|
||||
import java.net.URI
|
||||
import java.util.{Map, HashMap}
|
||||
import java.io.{File, IOException}
|
||||
import com.sun.grizzly.http.servlet.ServletAdapter
|
||||
import com.sun.grizzly.standalone.StaticStreamAlgorithm
|
||||
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
import javax.management.JMException
|
||||
import kernel.nio.{RemoteClient, RemoteServer}
|
||||
import kernel.state.CassandraNode
|
||||
|
||||
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
|
||||
|
||||
import kernel.jersey.AkkaServlet
|
||||
import kernel.nio.RemoteServer
|
||||
import kernel.state.CassandraStorage
|
||||
import kernel.util.Logging
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Kernel extends Logging {
|
||||
val config = setupConfig
|
||||
|
||||
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
|
||||
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
|
||||
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
|
||||
|
||||
val SERVER_URL = "localhost"
|
||||
/*
|
||||
private[this] var storageFactory: StoreClientFactory = _
|
||||
private[this] var storageServer: VoldemortServer = _
|
||||
*/
|
||||
|
||||
private[this] var remoteServer: RemoteServer = _
|
||||
// FIXME add API to shut server down gracefully
|
||||
private var remoteServer: RemoteServer = _
|
||||
private var jerseySelectorThread: SelectorThread = _
|
||||
private val startTime = System.currentTimeMillis
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
printBanner
|
||||
log.info("Starting Akka kernel...")
|
||||
startRemoteService
|
||||
startCassandra
|
||||
//cassandraBenchmark
|
||||
|
||||
//startJersey
|
||||
if (RUN_REMOTE_SERVICE) startRemoteService
|
||||
STORAGE_SYSTEM match {
|
||||
case "cassandra" => startCassandra
|
||||
case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
|
||||
case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
|
||||
case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
|
||||
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
|
||||
case "tokyo-tyrant" => throw new UnsupportedOperationException("tokyo-tyrart storage backend is not yet supported")
|
||||
case "hazelcast" => throw new UnsupportedOperationException("hazelcast storage backend is not yet supported")
|
||||
}
|
||||
if (RUN_REST_SERVICE) startJersey
|
||||
|
||||
//startZooKeeper
|
||||
//startVoldemort
|
||||
//cassandraBenchmark
|
||||
log.info("Akka kernel started successfully")
|
||||
}
|
||||
|
||||
def uptime = (System.currentTimeMillis - startTime) / 1000
|
||||
|
||||
def setupConfig: Config = {
|
||||
try {
|
||||
Configgy.configure(akka.Boot.CONFIG + "/akka.conf")
|
||||
val runtime = new RuntimeEnvironment(getClass)
|
||||
//runtime.load(args)
|
||||
val config = Configgy.config
|
||||
config.registerWithJmx("com.scalablesolutions.akka.config")
|
||||
// config.subscribe { c => configure(c.getOrElse(new Config)) }
|
||||
config
|
||||
} catch {
|
||||
case e: net.lag.configgy.ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.")
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def startRemoteService = {
|
||||
// FIXME manage remote serve thread for graceful shutdown
|
||||
val remoteServerThread = new Thread(new Runnable() {
|
||||
def run = {
|
||||
RemoteServer.start
|
||||
}
|
||||
})
|
||||
def run = RemoteServer.start
|
||||
}, "akka remote service")
|
||||
remoteServerThread.start
|
||||
Thread.sleep(1000) // wait for server to start up
|
||||
}
|
||||
|
||||
private[akka] def startCassandra = {
|
||||
CassandraNode.start
|
||||
private[akka] def startCassandra = if (kernel.Kernel.config.getBool("akka.storage.cassandra.service", true)) {
|
||||
System.setProperty("cassandra", "")
|
||||
System.setProperty("storage-config", akka.Boot.CONFIG + "/")
|
||||
CassandraStorage.start
|
||||
}
|
||||
|
||||
private[akka] def startJersey = {
|
||||
val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/"
|
||||
val JERSEY_SERVER_PORT = 9998
|
||||
val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel"
|
||||
val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
|
||||
val initParams = new java.util.HashMap[String, String]
|
||||
initParams.put("com.sun.jersey.config.property.packages", JERSEY_REST_CLASSES_ROOT_PACKAGE)
|
||||
val threadSelector = GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams)
|
||||
// TODO: handle shutdown of Jersey in separate thread
|
||||
// TODO: spawn main in new thread an communicate using socket
|
||||
System.in.read
|
||||
threadSelector.stopEndpoint
|
||||
val JERSEY_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost")
|
||||
val JERSEY_URL = "http://" + JERSEY_HOSTNAME + "/"
|
||||
val JERSEY_PORT = kernel.Kernel.config.getInt("akka.rest.port", 9998)
|
||||
|
||||
val uri = UriBuilder.fromUri(JERSEY_URL).port(JERSEY_PORT).build()
|
||||
val adapter = new ServletAdapter
|
||||
val servlet = classOf[AkkaServlet].newInstance
|
||||
adapter.setServletInstance(servlet)
|
||||
adapter.setContextPath(uri.getPath)
|
||||
|
||||
val scheme = uri.getScheme
|
||||
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException("The URI scheme, of the URI " + JERSEY_URL + ", must be equal (ignoring case) to 'http'")
|
||||
|
||||
jerseySelectorThread = new SelectorThread
|
||||
jerseySelectorThread.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
|
||||
jerseySelectorThread.setPort(JERSEY_PORT)
|
||||
jerseySelectorThread.setAdapter(adapter)
|
||||
jerseySelectorThread.listen
|
||||
log.info("REST service started successfully. Listening to port [" + JERSEY_PORT + "]")
|
||||
}
|
||||
|
||||
private def printBanner = {
|
||||
log.info(
|
||||
"""==============================
|
||||
__ __
|
||||
_____ | | _| | _______
|
||||
\__ \ | |/ / |/ /\__ \
|
||||
/ __ \| <| < / __ \_
|
||||
(____ /__|_ \__|_ \(____ /
|
||||
\/ \/ \/ \/
|
||||
""")
|
||||
log.info(" Running version " + kernel.Kernel.config.getString("akka.version", "awesome"))
|
||||
log.info("==============================")
|
||||
}
|
||||
|
||||
private def cassandraBenchmark = {
|
||||
val NR_ENTRIES = 100000
|
||||
|
||||
|
||||
println("=================================================")
|
||||
var start = System.currentTimeMillis
|
||||
for (i <- 1 to NR_ENTRIES) CassandraNode.insertMapStorageEntryFor("test", i.toString, "data")
|
||||
for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
|
||||
var end = System.currentTimeMillis
|
||||
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
|
||||
|
||||
/*
|
||||
FIXME: batch_insert fails with the following exception:
|
||||
|
||||
ERROR - Exception was generated at : 04/27/2009 15:26:35 on thread main
|
||||
[B cannot be cast to org.apache.cassandra.db.WriteResponse
|
||||
java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.WriteResponse
|
||||
at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:50)
|
||||
at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:31)
|
||||
at org.apache.cassandra.service.QuorumResponseHandler.get(QuorumResponseHandler.java:101)
|
||||
at org.apache.cassandra.service.StorageProxy.insertBlocking(StorageProxy.java:135)
|
||||
at org.apache.cassandra.service.CassandraServer.batch_insert_blocking(CassandraServer.java:489)
|
||||
at se.scalablesolutions.akka.kernel.CassandraNode$.insertHashEntries(CassandraNode.scala:59)
|
||||
at se.scalablesolutions.akka.kernel.Kernel$.cassandraBenchmark(Kernel.scala:91)
|
||||
at se.scalablesolutions.akka.kernel.Kernel$.main(Kernel.scala:52)
|
||||
at se.scalablesolutions.akka.kernel.Kernel.main(Kernel.scala)
|
||||
|
||||
println("=================================================")
|
||||
var start = System.currentTimeMillis
|
||||
println(start)
|
||||
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
|
||||
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
|
||||
CassandraNode.insertHashEntries("test", entries.toList)
|
||||
var end = System.currentTimeMillis
|
||||
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
|
||||
*/
|
||||
println("=================================================")
|
||||
start = System.currentTimeMillis
|
||||
for (i <- 1 to NR_ENTRIES) CassandraNode.getMapStorageEntryFor("test", i.toString)
|
||||
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
|
||||
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
|
||||
CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
|
||||
end = System.currentTimeMillis
|
||||
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
|
||||
|
||||
println("=================================================")
|
||||
start = System.currentTimeMillis
|
||||
for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
|
||||
end = System.currentTimeMillis
|
||||
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
|
||||
|
||||
System.exit(0)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
|
||||
//import voldemort.server.{VoldemortConfig, VoldemortServer}
|
||||
//import voldemort.versioning.Versioned
|
||||
|
||||
private[this] var storageFactory: StoreClientFactory = _
|
||||
private[this] var storageServer: VoldemortServer = _
|
||||
*/
|
||||
|
||||
// private[akka] def startVoldemort = {
|
||||
// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
|
||||
// val VOLDEMORT_SERVER_PORT = 6666
|
||||
|
|
@ -159,7 +192,11 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
|
|||
// private[akka] def getStorageFor(storageName: String): StoreClient[String, String] =
|
||||
// storageFactory.getStoreClient(storageName)
|
||||
|
||||
// private[akka] def startZooKeeper = {
|
||||
// private[akka] def startZooKeeper = {
|
||||
//import org.apache.zookeeper.jmx.ManagedUtil
|
||||
//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
|
||||
//import org.apache.zookeeper.server.ServerConfig
|
||||
//import org.apache.zookeeper.server.NIOServerCnxn
|
||||
// val ZOO_KEEPER_SERVER_URL = SERVER_URL
|
||||
// val ZOO_KEEPER_SERVER_PORT = 9898
|
||||
// try {
|
||||
|
|
@ -191,22 +228,3 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
|
|||
// // if (zooKeeper.isRunning) zooKeeper.shutdown
|
||||
// } catch { case e => log.fatal("Unexpected exception: s%",e) }
|
||||
// }
|
||||
|
||||
private def getPort(defaultPort: Int) = {
|
||||
val port = System.getenv("JERSEY_HTTP_PORT")
|
||||
if (null != port) Integer.parseInt(port)
|
||||
else defaultPort;
|
||||
}
|
||||
}
|
||||
|
||||
//import javax.ws.rs.{Produces, Path, GET}
|
||||
// @GET
|
||||
// @Produces("application/json")
|
||||
// @Path("/network/{id: [0-9]+}/{nid}")
|
||||
// def getUserByNetworkId(@PathParam {val value = "id"} id: Int, @PathParam {val value = "nid"} networkId: String): User = {
|
||||
// val q = em.createQuery("SELECT u FROM User u WHERE u.networkId = :id AND u.networkUserId = :nid")
|
||||
// q.setParameter("id", id)
|
||||
// q.setParameter("nid", networkId)
|
||||
// q.getSingleResult.asInstanceOf[User]
|
||||
// }
|
||||
|
||||
|
|
|
|||
|
|
@ -20,11 +20,11 @@ class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectEx
|
|||
|
||||
object Annotations {
|
||||
import se.scalablesolutions.akka.annotation._
|
||||
val transactional = classOf[transactional]
|
||||
val oneway = classOf[oneway]
|
||||
val immutable = classOf[immutable]
|
||||
val prerestart = classOf[prerestart]
|
||||
val postrestart = classOf[postrestart]
|
||||
val oneway = classOf[oneway]
|
||||
val transactionrequired = classOf[transactionrequired]
|
||||
val prerestart = classOf[prerestart]
|
||||
val postrestart = classOf[postrestart]
|
||||
val immutable = classOf[immutable]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -259,14 +259,14 @@ sealed class ActorAroundAdvice(val target: Class[_],
|
|||
*/
|
||||
private[kernel] class Dispatcher extends Actor {
|
||||
private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
|
||||
|
||||
makeTransactional
|
||||
|
||||
private[actor] var target: Option[AnyRef] = None
|
||||
private var preRestart: Option[Method] = None
|
||||
private var postRestart: Option[Method] = None
|
||||
|
||||
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
|
||||
if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactional
|
||||
|
||||
id = targetClass.getName
|
||||
target = Some(targetInstance)
|
||||
val methods = targetInstance.getClass.getDeclaredMethods.toList
|
||||
|
|
|
|||
|
|
@ -34,6 +34,10 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
|
|||
def handle(handle: MessageHandle) = actor.handle(handle)
|
||||
}
|
||||
|
||||
object Actor {
|
||||
val timeout = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
|
||||
}
|
||||
|
||||
trait Actor extends Logging with TransactionManagement {
|
||||
@volatile private[this] var isRunning: Boolean = false
|
||||
private[this] val remoteFlagLock = new ReadWriteLock
|
||||
|
|
@ -59,7 +63,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*
|
||||
* Defines the default timeout for '!!' invocations, e.g. the timeout for the future returned by the call to '!!'.
|
||||
*/
|
||||
@volatile var timeout: Long = 5000L
|
||||
@volatile var timeout: Long = Actor.timeout
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -397,8 +401,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
val future = RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, false, false, supervisorUuid))
|
||||
if (future.isDefined) future.get
|
||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
val future = new DefaultCompletableFutureResult(timeout)
|
||||
mailbox.append(new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get))
|
||||
future
|
||||
|
|
@ -526,5 +529,12 @@ trait Actor extends Logging with TransactionManagement {
|
|||
} else None
|
||||
}
|
||||
|
||||
|
||||
private[kernel] def swapDispatcher(disp: MessageDispatcher) = {
|
||||
dispatcher = disp
|
||||
mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageHandler(this))
|
||||
}
|
||||
|
||||
override def toString(): String = "Actor[" + id + "]"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,25 +4,13 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.config
|
||||
|
||||
import ScalaConfig.{RestartStrategy, Component}
|
||||
import javax.servlet.ServletContext
|
||||
|
||||
import scala.collection.mutable.HashSet
|
||||
|
||||
import ScalaConfig.{RestartStrategy, Component}
|
||||
import kernel.util.Logging
|
||||
|
||||
object ActiveObjectConfigurator extends Logging {
|
||||
|
||||
private var configuration: ActiveObjectConfigurator = _
|
||||
|
||||
// FIXME: cheating with only having one single, scope per ServletContext
|
||||
def registerConfigurator(conf: ActiveObjectConfigurator) = {
|
||||
configuration = conf
|
||||
}
|
||||
|
||||
def getConfiguratorFor(ctx: ServletContext): ActiveObjectConfigurator = {
|
||||
configuration
|
||||
//configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
|
||||
}
|
||||
}
|
||||
|
||||
trait ActiveObjectConfigurator {
|
||||
/**
|
||||
* Returns the active abject that has been put under supervision for the class specified.
|
||||
|
|
@ -32,6 +20,8 @@ trait ActiveObjectConfigurator {
|
|||
*/
|
||||
def getActiveObject[T](clazz: Class[T]): T
|
||||
|
||||
def isActiveObjectDefined[T](clazz: Class[T]): Boolean
|
||||
|
||||
def getExternalDependency[T](clazz: Class[T]): T
|
||||
|
||||
def getComponentInterfaces: List[Class[_]]
|
||||
|
|
@ -46,3 +36,29 @@ trait ActiveObjectConfigurator {
|
|||
|
||||
def stop
|
||||
}
|
||||
|
||||
object ActiveObjectConfigurator extends Logging {
|
||||
|
||||
private val configuration = new HashSet[ActiveObjectConfigurator]
|
||||
|
||||
// FIXME: cheating with only having one single, scope per ServletContext
|
||||
def registerConfigurator(conf: ActiveObjectConfigurator) = synchronized {
|
||||
configuration + conf
|
||||
}
|
||||
|
||||
def getConfiguratorsFor(ctx: ServletContext): List[ActiveObjectConfigurator] = synchronized {
|
||||
configuration.toList
|
||||
//configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
|
||||
}
|
||||
}
|
||||
|
||||
class ActiveObjectConfiguratorRepository extends Logging {
|
||||
def registerConfigurator(conf: ActiveObjectConfigurator) = {
|
||||
ActiveObjectConfigurator.registerConfigurator(conf)
|
||||
}
|
||||
|
||||
def getConfiguratorsFor(ctx: ServletContext): List[ActiveObjectConfigurator] = {
|
||||
ActiveObjectConfigurator.getConfiguratorsFor(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,6 +52,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
|
|||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
override def isActiveObjectDefined[T](clazz: Class[T]): Boolean = synchronized {
|
||||
activeObjectRegistry.get(clazz).isDefined
|
||||
}
|
||||
|
||||
override def getExternalDependency[T](clazz: Class[T]): T = synchronized {
|
||||
injector.getInstance(clazz).asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -95,8 +99,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
|
|||
private def newSubclassingProxy(component: Component): DependencyBinding = {
|
||||
val targetClass = component.target
|
||||
val actor = new Dispatcher
|
||||
actor.start
|
||||
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
|
||||
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
|
||||
val remoteAddress =
|
||||
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
|
||||
else None
|
||||
|
|
@ -111,8 +114,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
|
|||
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
|
||||
component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
|
||||
val actor = new Dispatcher
|
||||
actor.start
|
||||
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
|
||||
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
|
||||
val remoteAddress =
|
||||
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
|
||||
else None
|
||||
|
|
|
|||
|
|
@ -17,7 +17,9 @@ import org.apache.camel.{Endpoint, Routes}
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ActiveObjectGuiceConfiguratorForJava {
|
||||
val INSTANCE = new ActiveObjectGuiceConfigurator
|
||||
private val INSTANCE = new ActiveObjectGuiceConfigurator
|
||||
|
||||
def getInstance = INSTANCE
|
||||
|
||||
/**
|
||||
* Returns the active abject that has been put under supervision for the class specified.
|
||||
|
|
|
|||
|
|
@ -8,10 +8,19 @@ import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
|
|||
|
||||
import kernel.config.ActiveObjectConfigurator
|
||||
import kernel.util.Logging
|
||||
import java.lang.reflect.{Constructor, InvocationTargetException}
|
||||
|
||||
class ActiveObjectComponentProvider(val clazz: Class[_], val configurator: ActiveObjectConfigurator)
|
||||
class ActiveObjectComponentProvider(val clazz: Class[_], val configurators: List[ActiveObjectConfigurator])
|
||||
extends IoCFullyManagedComponentProvider with Logging {
|
||||
|
||||
override def getInstance: AnyRef = configurator.getActiveObject(clazz).asInstanceOf[AnyRef]
|
||||
override def getInstance: AnyRef = {
|
||||
val instances = for {
|
||||
conf <- configurators
|
||||
if conf.isActiveObjectDefined(clazz)
|
||||
} yield conf.getActiveObject(clazz).asInstanceOf[AnyRef]
|
||||
instances match {
|
||||
case instance :: Nil => instance
|
||||
case Nil => throw new IllegalArgumentException("No Active Object for class [" + clazz + "] could be found. Make sure you have defined and configured the class as an Active Object in a ActiveObjectConfigurator")
|
||||
case _ => throw new IllegalArgumentException("Active Object for class [" + clazz + "] is defined in more than one ActiveObjectConfigurator. Eliminate the redundancy.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4,16 +4,16 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.jersey
|
||||
|
||||
import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider, IoCComponentProviderFactory}
|
||||
import com.sun.jersey.core.spi.component.{ComponentContext, ComponentProviderFactory}
|
||||
import com.sun.jersey.core.spi.component.ioc.IoCComponentProviderFactory
|
||||
import com.sun.jersey.core.spi.component.ComponentContext
|
||||
import config.ActiveObjectConfigurator
|
||||
|
||||
class ActiveObjectComponentProviderFactory(val configurator: ActiveObjectConfigurator)
|
||||
class ActiveObjectComponentProviderFactory(val configurators: List[ActiveObjectConfigurator])
|
||||
extends IoCComponentProviderFactory {
|
||||
|
||||
override def getComponentProvider(clazz: Class[_]): ActiveObjectComponentProvider = getComponentProvider(null, clazz)
|
||||
|
||||
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): ActiveObjectComponentProvider = {
|
||||
new ActiveObjectComponentProvider(clazz, configurator)
|
||||
new ActiveObjectComponentProvider(clazz, configurators)
|
||||
}
|
||||
}
|
||||
|
|
@ -4,23 +4,26 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.jersey
|
||||
|
||||
import config.ActiveObjectConfigurator
|
||||
|
||||
import com.sun.jersey.api.core.{DefaultResourceConfig, ResourceConfig}
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
import com.sun.jersey.spi.container.WebApplication
|
||||
import config.ActiveObjectConfigurator
|
||||
import java.util.{HashSet, ArrayList}
|
||||
|
||||
import java.util.HashSet
|
||||
|
||||
class AkkaServlet extends ServletContainer {
|
||||
|
||||
override def initiate(rc: ResourceConfig, wa: WebApplication) = {
|
||||
val configurator = ActiveObjectConfigurator.getConfiguratorFor(getServletContext);
|
||||
val configurators = ActiveObjectConfigurator.getConfiguratorsFor(getServletContext);
|
||||
val set = new HashSet[Class[_]]
|
||||
for (c <- configurator.getComponentInterfaces) {
|
||||
println("========== " + c)
|
||||
set.add(c)
|
||||
}
|
||||
for {
|
||||
conf <- configurators
|
||||
clazz <- conf.getComponentInterfaces
|
||||
} set.add(clazz)
|
||||
|
||||
wa.initiate(
|
||||
new DefaultResourceConfig(set),
|
||||
new ActiveObjectComponentProviderFactory(configurator));
|
||||
new ActiveObjectComponentProviderFactory(configurators));
|
||||
}
|
||||
}
|
||||
|
|
@ -28,7 +28,7 @@ object RemoteClient extends Logging {
|
|||
else {
|
||||
val client = new RemoteClient(hostname, port)
|
||||
client.connect
|
||||
clients + hash -> client
|
||||
clients += hash -> client
|
||||
client
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,10 +23,9 @@ class RemoteServer extends Logging {
|
|||
}
|
||||
|
||||
object RemoteServer extends Logging {
|
||||
// FIXME make all remote server option configurable
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9999
|
||||
val CONNECTION_TIMEOUT_MILLIS = 100
|
||||
val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
|
||||
val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
|
||||
val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000)
|
||||
|
||||
@volatile private var isRunning = false
|
||||
|
||||
|
|
@ -77,7 +76,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
//e.getChannel.write(firstMessage)
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val message = event.getMessage
|
||||
if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
|
||||
|
|
|
|||
|
|
@ -60,7 +60,6 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
|
|||
private val NR_START_THREADS = 16
|
||||
private val NR_MAX_THREADS = 128
|
||||
private val KEEP_ALIVE_TIME = 60000L // default is one minute
|
||||
private val MILLISECONDS = TimeUnit.MILLISECONDS
|
||||
|
||||
private var inProcessOfBuilding = false
|
||||
private var executor: ExecutorService = _
|
||||
|
|
@ -114,14 +113,15 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
|
|||
override protected def doShutdown = executor.shutdownNow
|
||||
|
||||
private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized {
|
||||
if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
|
||||
if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
|
||||
else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
|
||||
busyHandlers.add(key)
|
||||
Some(messageHandlers.get(key))
|
||||
} else None
|
||||
}
|
||||
|
||||
private def free(key: AnyRef) = guard.synchronized {
|
||||
busyHandlers.remove(key)
|
||||
if (!CONCURRENT_MODE) busyHandlers.remove(key)
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -297,16 +297,16 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
|||
def submit[T](callable: Callable[T]) = executor.submit(callable)
|
||||
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
|
||||
def submit(runnable: Runnable) = executor.submit(runnable)
|
||||
/*
|
||||
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
|
||||
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
*/
|
||||
/*
|
||||
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
|
||||
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
||||
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
|
||||
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
*/
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,9 +4,13 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.reactor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.HashMap
|
||||
|
||||
trait MessageDispatcherBase extends MessageDispatcher {
|
||||
val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
|
||||
val MILLISECONDS = TimeUnit.MILLISECONDS
|
||||
|
||||
val messageQueue = new MessageQueue
|
||||
|
||||
@volatile protected var active: Boolean = false
|
||||
|
|
|
|||
|
|
@ -5,12 +5,17 @@
|
|||
package se.scalablesolutions.akka.kernel.state
|
||||
|
||||
import java.io.File
|
||||
import java.lang.reflect.Constructor
|
||||
import kernel.util.{Serializer, JavaSerializationSerializer, Logging}
|
||||
|
||||
import org.apache.cassandra.config.DatabaseDescriptor
|
||||
import org.apache.cassandra.service._
|
||||
|
||||
import org.apache.thrift.server.TThreadPoolServer
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.TServerSocket
|
||||
import org.apache.thrift.transport.TTransportFactory
|
||||
import org.apache.thrift.TProcessorFactory
|
||||
|
||||
/**
|
||||
* NOTE: requires command line options:
|
||||
* <br/>
|
||||
|
|
@ -18,18 +23,30 @@ import org.apache.cassandra.service._
|
|||
* <p/>
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
final object CassandraNode extends Logging {
|
||||
|
||||
final object CassandraStorage extends Logging {
|
||||
val TABLE_NAME = "akka"
|
||||
val MAP_COLUMN_FAMILY = "map"
|
||||
val VECTOR_COLUMN_FAMILY = "vector"
|
||||
val REF_COLUMN_FAMILY = "ref:item"
|
||||
val IS_ASCENDING = true
|
||||
|
||||
// TODO: make pluggable (Avro, JSON, Thrift, Protobuf etc.)
|
||||
private[this] var serializer: Serializer = new JavaSerializationSerializer
|
||||
val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
|
||||
val BLOCKING_CALL = kernel.Kernel.config.getInt("akka.storage.cassandra.blocking", 0)
|
||||
|
||||
private[this] val serializer: Serializer = {
|
||||
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "serialization") match {
|
||||
case "serialization" => new JavaSerializationSerializer
|
||||
case "json" => throw new UnsupportedOperationException("json storage protocol is not yet supported")
|
||||
case "avro" => throw new UnsupportedOperationException("avro storage protocol is not yet supported")
|
||||
case "thrift" => throw new UnsupportedOperationException("thrift storage protocol is not yet supported")
|
||||
case "protobuf" => throw new UnsupportedOperationException("protobuf storage protocol is not yet supported")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: is this server thread-safe or needed to be wrapped up in an actor?
|
||||
private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
|
||||
|
||||
private[this] var thriftServer: CassandraThriftServer = _
|
||||
|
||||
def start = {
|
||||
try {
|
||||
|
|
@ -40,9 +57,16 @@ final object CassandraNode extends Logging {
|
|||
log.error("Could not start up persistent storage")
|
||||
throw e
|
||||
}
|
||||
if (RUN_THRIFT_SERVICE) {
|
||||
thriftServer = new CassandraThriftServer(server)
|
||||
thriftServer.start
|
||||
}
|
||||
}
|
||||
|
||||
def stop = {}
|
||||
def stop = {
|
||||
//server.storageService.shutdown
|
||||
if (RUN_THRIFT_SERVICE) thriftServer.stop
|
||||
}
|
||||
|
||||
// ===============================================================
|
||||
// For Ref
|
||||
|
|
@ -55,7 +79,7 @@ final object CassandraNode extends Logging {
|
|||
REF_COLUMN_FAMILY,
|
||||
serializer.out(element),
|
||||
System.currentTimeMillis,
|
||||
false) // FIXME: what is this flag for?
|
||||
BLOCKING_CALL)
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[AnyRef] = {
|
||||
|
|
@ -80,7 +104,7 @@ final object CassandraNode extends Logging {
|
|||
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
|
||||
serializer.out(element),
|
||||
System.currentTimeMillis,
|
||||
false) // FIXME: what is this flag for?
|
||||
BLOCKING_CALL)
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
|
|
@ -95,7 +119,7 @@ final object CassandraNode extends Logging {
|
|||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
|
||||
server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, start, count)
|
||||
server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
|
||||
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int =
|
||||
|
|
@ -112,7 +136,7 @@ final object CassandraNode extends Logging {
|
|||
MAP_COLUMN_FAMILY + ":" + key,
|
||||
serializer.out(value),
|
||||
System.currentTimeMillis,
|
||||
false) // FIXME: what is this flag for?
|
||||
BLOCKING_CALL)
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
|
||||
|
|
@ -127,7 +151,7 @@ final object CassandraNode extends Logging {
|
|||
TABLE_NAME,
|
||||
name,
|
||||
columns),
|
||||
false) // non-blocking
|
||||
BLOCKING_CALL)
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
|
||||
|
|
@ -154,28 +178,20 @@ final object CassandraNode extends Logging {
|
|||
server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
|
||||
|
||||
def removeMapStorageFor(name: String) =
|
||||
server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, false)
|
||||
server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] =
|
||||
server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, start, count)
|
||||
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
|
||||
server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
|
||||
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This code is only for starting up the Cassandra Thrift server, perhaps later
|
||||
|
||||
import scala.actors.Actor._
|
||||
|
||||
import com.facebook.thrift.protocol.TBinaryProtocol
|
||||
import com.facebook.thrift.protocol.TProtocolFactory
|
||||
import com.facebook.thrift.server.TThreadPoolServer
|
||||
import com.facebook.thrift.transport.TServerSocket
|
||||
import com.facebook.thrift.transport.TTransportException
|
||||
import com.facebook.thrift.transport.TTransportFactory
|
||||
import com.facebook.thrift.TProcessorFactory
|
||||
class CassandraThriftServer(server: CassandraServer) extends Logging {
|
||||
case object Start
|
||||
case object Stop
|
||||
|
||||
private[this] val serverEngine: TThreadPoolServer = try {
|
||||
val pidFile = System.getProperty("pidfile")
|
||||
val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
|
||||
if (pidFile != null) new File(pidFile).deleteOnExit();
|
||||
val listenPort = DatabaseDescriptor.getThriftPort
|
||||
|
||||
|
|
@ -197,17 +213,19 @@ import com.facebook.thrift.TProcessorFactory
|
|||
log.error("Could not start up persistent storage node.")
|
||||
throw e
|
||||
}
|
||||
|
||||
import scala.actors.Actor._
|
||||
private[this] val serverDaemon = actor {
|
||||
receive {
|
||||
case Start =>
|
||||
log.info("Persistent storage node starting up...")
|
||||
case Start =>
|
||||
log.info("Cassandra thrift service is starting up...")
|
||||
serverEngine.serve
|
||||
case Stop =>
|
||||
log.info("Persistent storage node shutting down...")
|
||||
case Stop =>
|
||||
log.info("Cassandra thrift service is shutting down...")
|
||||
serverEngine.stop
|
||||
//case Insert(..) =>
|
||||
// server.
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
def start = serverDaemon ! Start
|
||||
def stop = serverDaemon ! Stop
|
||||
}
|
||||
|
|
@ -186,7 +186,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
|
|||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -194,43 +194,44 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
|
|||
|
||||
override def getRange(start: Int, count: Int) = {
|
||||
verifyTransaction
|
||||
CassandraNode.getMapStorageRangeFor(uuid, start, count)
|
||||
CassandraStorage.getMapStorageRangeFor(uuid, start, count)
|
||||
}
|
||||
|
||||
// ---- For Transactional ----
|
||||
override def commit = {
|
||||
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
|
||||
// FIXME: should use batch function once the bug is resolved
|
||||
for (entry <- changeSet) {
|
||||
val (key, value) = entry
|
||||
CassandraNode.insertMapStorageEntryFor(uuid, key, value)
|
||||
}
|
||||
// for (entry <- changeSet) {
|
||||
// val (key, value) = entry
|
||||
// CassandraStorage.insertMapStorageEntryFor(uuid, key, value)
|
||||
// }
|
||||
}
|
||||
|
||||
// ---- Overriding scala.collection.mutable.Map behavior ----
|
||||
override def clear = {
|
||||
verifyTransaction
|
||||
CassandraNode.removeMapStorageFor(uuid)
|
||||
CassandraStorage.removeMapStorageFor(uuid)
|
||||
}
|
||||
override def contains(key: String): Boolean = {
|
||||
verifyTransaction
|
||||
CassandraNode.getMapStorageEntryFor(uuid, key).isDefined
|
||||
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
|
||||
}
|
||||
override def size: Int = {
|
||||
verifyTransaction
|
||||
CassandraNode.getMapStorageSizeFor(uuid)
|
||||
CassandraStorage.getMapStorageSizeFor(uuid)
|
||||
}
|
||||
|
||||
// ---- For scala.collection.mutable.Map ----
|
||||
override def get(key: String): Option[AnyRef] = {
|
||||
verifyTransaction
|
||||
val result = CassandraNode.getMapStorageEntryFor(uuid, key)
|
||||
val result = CassandraStorage.getMapStorageEntryFor(uuid, key)
|
||||
result
|
||||
}
|
||||
|
||||
override def elements: Iterator[Tuple2[String, AnyRef]] = {
|
||||
//verifyTransaction
|
||||
new Iterator[Tuple2[String, AnyRef]] {
|
||||
private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getMapStorageFor(uuid)
|
||||
private val originalList: List[Tuple2[String, AnyRef]] = CassandraStorage.getMapStorageFor(uuid)
|
||||
private var elements = originalList.reverse
|
||||
override def next: Tuple2[String, AnyRef]= synchronized {
|
||||
val element = elements.head
|
||||
|
|
@ -335,15 +336,15 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
|
|||
// ---- For TransactionalVector ----
|
||||
override def get(index: Int): AnyRef = {
|
||||
verifyTransaction
|
||||
CassandraNode.getVectorStorageEntryFor(uuid, index)
|
||||
CassandraStorage.getVectorStorageEntryFor(uuid, index)
|
||||
}
|
||||
override def getRange(start: Int, count: Int): List[AnyRef] = {
|
||||
verifyTransaction
|
||||
CassandraNode.getVectorStorageRangeFor(uuid, start, count)
|
||||
CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
|
||||
}
|
||||
override def length: Int = {
|
||||
verifyTransaction
|
||||
CassandraNode.getVectorStorageSizeFor(uuid)
|
||||
CassandraStorage.getVectorStorageSizeFor(uuid)
|
||||
}
|
||||
override def apply(index: Int): AnyRef = get(index)
|
||||
override def first: AnyRef = get(0)
|
||||
|
|
@ -358,7 +359,7 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
|
|||
override def commit = {
|
||||
// FIXME: should use batch function once the bug is resolved
|
||||
for (element <- changeSet) {
|
||||
CassandraNode.insertVectorStorageEntryFor(uuid, element)
|
||||
CassandraStorage.insertVectorStorageEntryFor(uuid, element)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -397,11 +398,11 @@ class TransactionalRef[T] extends Transactional {
|
|||
}
|
||||
|
||||
class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
|
||||
override def commit = if (ref.isDefined) CassandraNode.insertRefStorageFor(uuid, ref.get)
|
||||
override def commit = if (ref.isDefined) CassandraStorage.insertRefStorageFor(uuid, ref.get)
|
||||
|
||||
override def get: Option[AnyRef] = {
|
||||
verifyTransaction
|
||||
CassandraNode.getRefStorageFor(uuid)
|
||||
CassandraStorage.getRefStorageFor(uuid)
|
||||
}
|
||||
override def isDefined: Boolean = get.isDefined
|
||||
override def getOrElse(default: => AnyRef): AnyRef = {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
|
|||
}
|
||||
|
||||
object TransactionManagement {
|
||||
private val txEnabled = new AtomicBoolean(true)
|
||||
private val txEnabled = new AtomicBoolean(kernel.Kernel.config.getBool("akka.stm.service", true))
|
||||
|
||||
def isTransactionalityEnabled = txEnabled.get
|
||||
def disableTransactions = txEnabled.set(false)
|
||||
|
|
|
|||
105
kernel/src/main/scala/stm/TransactionWatcher.scala
Normal file
105
kernel/src/main/scala/stm/TransactionWatcher.scala
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.kernel.stm
|
||||
|
||||
/*
|
||||
import kernel.util.Logging
|
||||
import org.apache.zookeeper.jmx.ManagedUtil
|
||||
import org.apache.zookeeper.server.persistence.FileTxnSnapLog
|
||||
import org.apache.zookeeper.server.{ServerConfig, NIOServerCnxn}
|
||||
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher, ZooKeeper, DataMonitor}
|
||||
*/
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*
|
||||
class TransactionWatcher extends Logging with Watcher {
|
||||
|
||||
val SERVER_URL = "localhost"
|
||||
|
||||
val ZOO_KEEPER_URL = SERVER_URL
|
||||
val ZOO_KEEPER_PORT = 2181
|
||||
val znode = "master"
|
||||
|
||||
private[this] val db = new scala.collection.mutable.HashMap[String, String]
|
||||
|
||||
private[this] val zk = new ZooKeeper(ZOO_KEEPER_URL + ":" + ZOO_KEEPER_PORT, 3000, this)
|
||||
private[this] val dm = new DataMonitor(zk, znode, null, this)
|
||||
|
||||
override def process(event: WatchedEvent) = {
|
||||
log.debug("New ZooKeeper event: %s", event)
|
||||
val path = event.getPath();
|
||||
if (event.getType == Event.EventType.None) {
|
||||
// We are are being told that the state of the connection has changed
|
||||
event.getState match {
|
||||
case SyncConnected =>
|
||||
// In this particular example we don't need to do anything
|
||||
// here - watches are automatically re-registered with
|
||||
// server and any watches triggered while the client was
|
||||
// disconnected will be delivered (in order of course)
|
||||
case Expired =>
|
||||
dead = true
|
||||
listener.closing(KeeperException.Code.SessionExpired)
|
||||
}
|
||||
} else {
|
||||
if (path != null && path.equals(znode)) {
|
||||
// Something has changed on the node, let's find out
|
||||
zk.exists(znode, true, this, null)
|
||||
}
|
||||
}
|
||||
if (chainedWatcher != null) chainedWatcher.process(event);
|
||||
}
|
||||
|
||||
|
||||
|
||||
def run: Unit = synchronized {
|
||||
try {
|
||||
while (!dm.dead) wait
|
||||
} catch {
|
||||
case e: InterruptedException => Thread.currentThread.interrupt
|
||||
}
|
||||
}
|
||||
|
||||
def closing(rc: Int): Unit = synchronized { notifyAll() }
|
||||
}
|
||||
|
||||
*/
|
||||
object TransactionWatcher {
|
||||
def main(args: Array[String]): Unit = {
|
||||
println("Connecting to ZooKeeper...")
|
||||
//new TransactionWatcher
|
||||
}
|
||||
}
|
||||
|
||||
// private[akka] def startZooKeeper = {
|
||||
// try {
|
||||
// ManagedUtil.registerLog4jMBeans
|
||||
// ServerConfig.parse(args)
|
||||
// } catch {
|
||||
// case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
|
||||
// case e => log.fatal("Error in ZooKeeper config: s%", e)
|
||||
// }
|
||||
// val factory = new ZooKeeperServer.Factory() {
|
||||
// override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
|
||||
// override def createServer = {
|
||||
// val server = new ZooKeeperServer
|
||||
// val txLog = new FileTxnSnapLog(
|
||||
// new File(ServerConfig.getDataLogDir),
|
||||
// new File(ServerConfig.getDataDir))
|
||||
// server.setTxnLogFactory(txLog)
|
||||
// server
|
||||
// }
|
||||
// }
|
||||
// try {
|
||||
// val zooKeeper = factory.createServer
|
||||
// zooKeeper.startup
|
||||
// log.info("ZooKeeper started")
|
||||
// // TODO: handle clean shutdown as below in separate thread
|
||||
// // val cnxnFactory = serverFactory.createConnectionFactory
|
||||
// // cnxnFactory.setZooKeeperServer(zooKeeper)
|
||||
// // cnxnFactory.join
|
||||
// // if (zooKeeper.isRunning) zooKeeper.shutdown
|
||||
// } catch { case e => log.fatal("Unexpected exception: s%",e) }
|
||||
// }
|
||||
|
||||
2
kernel/src/main/scala/util/Logging.scala
Executable file → Normal file
2
kernel/src/main/scala/util/Logging.scala
Executable file → Normal file
|
|
@ -22,7 +22,7 @@ import java.net.UnknownHostException;
|
|||
trait Logging {
|
||||
@transient var log = {
|
||||
val log = Logger.get(this.getClass.getName)
|
||||
log.setLevel(Level.ALL)
|
||||
//0log.setLevel(Level.ALL)
|
||||
log
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
stateful.start
|
||||
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(100)
|
||||
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
}
|
||||
|
|
@ -100,7 +100,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val stateful = new InMemStatefulActor
|
||||
stateful.start
|
||||
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
}
|
||||
|
||||
|
|
@ -112,7 +112,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
failer.start
|
||||
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(100)
|
||||
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
}
|
||||
|
|
@ -125,7 +125,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
|
|
@ -137,7 +137,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(100)
|
||||
assertEquals("new state", (stateful !! GetVectorState).get)
|
||||
}
|
||||
|
|
@ -147,7 +147,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val stateful = new InMemStatefulActor
|
||||
stateful.start
|
||||
stateful !! SetVectorState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetVectorState).get)
|
||||
}
|
||||
|
||||
|
|
@ -159,7 +159,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
Thread.sleep(100)
|
||||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(100)
|
||||
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
|
||||
}
|
||||
|
|
@ -172,7 +172,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
|
||||
|
|
@ -184,7 +184,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(100)
|
||||
assertEquals("new state", (stateful !! GetRefState).get)
|
||||
}
|
||||
|
|
@ -194,7 +194,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val stateful = new InMemStatefulActor
|
||||
stateful.start
|
||||
stateful !! SetRefState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetRefState).get)
|
||||
}
|
||||
|
||||
|
|
@ -206,7 +206,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
Thread.sleep(100)
|
||||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(100)
|
||||
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
|
||||
}
|
||||
|
|
@ -219,7 +219,7 @@ class InMemoryActorSpec extends TestCase {
|
|||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val stateful = new PersistentActor
|
||||
stateful.start
|
||||
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
}
|
||||
|
||||
|
|
@ -84,7 +84,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
|
|
@ -95,7 +95,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val stateful = new PersistentActor
|
||||
stateful.start
|
||||
stateful !! SetVectorState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetVectorState).get)
|
||||
}
|
||||
|
||||
|
|
@ -107,7 +107,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
|
||||
|
|
@ -118,7 +118,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val stateful = new PersistentActor
|
||||
stateful.start
|
||||
stateful !! SetRefState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetRefState).get)
|
||||
}
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ class PersistentActorSpec extends TestCase {
|
|||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
try {
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
|
||||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ class RemoteActorSpecActorBidirectional extends Actor {
|
|||
|
||||
class RemoteActorSpec extends TestCase {
|
||||
|
||||
kernel.Kernel.config
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
val server = new RemoteServer
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ object Log {
|
|||
@RunWith(classOf[JUnit4Runner])
|
||||
class RemoteSupervisorSpec extends Suite {
|
||||
|
||||
Kernel.config
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
val server = new RemoteServer
|
||||
|
|
@ -244,6 +245,7 @@ class RemoteSupervisorSpec extends Suite {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
def testOneWayKillSingleActorOneForOne = {
|
||||
Log.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
|
@ -277,7 +279,8 @@ class RemoteSupervisorSpec extends Suite {
|
|||
Log.oneWayLog
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
/*
|
||||
def testOneWayKillSingleActorAllForOne = {
|
||||
Log.messageLog = ""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue