merge with master

This commit is contained in:
Jonas Boner 2009-08-02 16:18:35 +02:00
commit e18a3a6415
50 changed files with 953 additions and 618 deletions

View file

@ -14,7 +14,7 @@ import java.net.URLClassLoader
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
import kernel.jersey.AkkaServlet
import kernel.jersey.AkkaCometServlet
import kernel.nio.RemoteServer
import kernel.state.CassandraStorage
import kernel.util.Logging
@ -39,7 +39,6 @@ object Kernel extends Logging {
val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = kernel.Kernel.config.getInt("akka.rest.port", 9998)
// FIXME add API to shut server down gracefully
private var remoteServer: RemoteServer = _
private var jerseySelectorThread: SelectorThread = _
@ -92,8 +91,8 @@ object Kernel extends Logging {
private[akka] def runApplicationBootClasses = {
val HOME = try { System.getenv("AKKA_HOME") } catch { case e: NullPointerException => throw new IllegalStateException("AKKA_HOME system variable needs to be set. Should point to the root of the Akka distribution.") }
val CLASSES = HOME + "/kernel/target/classes" // FIXME remove for dist
val LIB = HOME + "/lib"
//val CLASSES = HOME + "/kernel/target/classes" // FIXME remove for dist
//val LIB = HOME + "/lib"
val CONFIG = HOME + "/config"
val DEPLOY = HOME + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
@ -119,24 +118,32 @@ object Kernel extends Logging {
private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
System.setProperty("cassandra", "")
System.setProperty("storage-config", akka.Boot.CONFIG + "/")
CassandraStorage.start
CassandraStorage.start
}
private[akka] def startJersey = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val adapter = new ServletAdapter
val servlet = new AkkaServlet
adapter.setServletInstance(servlet)
adapter.setContextPath(uri.getPath)
val scheme = uri.getScheme
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException("The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)
adapter.setServletInstance(new AkkaCometServlet)
adapter.setContextPath(uri.getPath)
adapter.setRootFolder(System.getenv("AKKA_HOME") + "/deploy/root")
log.info("REST service root path: [" + adapter.getRootFolder + "] and context path [" + adapter.getContextPath + "] ")
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
jerseySelectorThread = new SelectorThread
jerseySelectorThread.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
jerseySelectorThread.setPort(REST_PORT)
jerseySelectorThread.setAdapter(adapter)
jerseySelectorThread.setEnableAsyncExecution(true)
jerseySelectorThread.setAsyncHandler(ah)
jerseySelectorThread.listen
log.info("REST service started successfully. Listening to port [" + REST_PORT + "]")
}

View file

@ -96,7 +96,7 @@ object Actor {
* </pre>
*/
protected[kernel] var dispatcher: MessageDispatcher = {
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher

View file

@ -4,17 +4,22 @@
package se.scalablesolutions.akka.kernel.jersey
import com.sun.jersey.core.spi.component.ioc.IoCComponentProviderFactory
import com.sun.jersey.core.spi.component.ComponentContext
import kernel.Kernel
import util.Logging
import javax.ws.rs.core.Context
import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory}
import com.sun.jersey.core.spi.component.{ComponentContext}
import config.Configurator
class ActorComponentProviderFactory(val configurators: List[Configurator])
extends IoCComponentProviderFactory {
extends IoCComponentProviderFactory with Logging {
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(clazz: Class[_]): ActorComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): ActorComponentProvider = {
new ActorComponentProvider(clazz, configurators)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
//log.info("ProviderFactory: resolve => " + clazz.getName)
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
}
}

View file

@ -6,26 +6,81 @@ package se.scalablesolutions.akka.kernel.jersey
import kernel.Kernel
import config.ConfiguratorRepository
import util.Logging
import com.sun.jersey.api.core.{DefaultResourceConfig, ResourceConfig}
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
import java.util.HashSet
import javax.servlet.{ServletConfig}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
class AkkaServlet extends ServletContainer {
import org.atmosphere.cpr.{AtmosphereServletProcessor, AtmosphereEvent}
import org.atmosphere.cpr.AtmosphereServlet.AtmosphereHandlerWrapper
import org.atmosphere.container.GrizzlyCometSupport
import org.atmosphere.handler.ReflectorServletProcessor
import org.atmosphere.core.{JerseyBroadcaster}
import java.net.URLClassLoader
import java.io.InputStream
import scala.collection.jcl.Conversions._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with Logging {
override def initiate(rc: ResourceConfig, wa: WebApplication) = {
Kernel.boot // will boot if not already booted by 'main'
val configurators = ConfiguratorRepository.getConfiguratorsFor(getServletContext);
val set = new HashSet[Class[_]]
for {
conf <- configurators
clazz <- conf.getComponentInterfaces
} set.add(clazz)
val configurators = ConfiguratorRepository.getConfiguratorsFor(getServletContext)
wa.initiate(
new DefaultResourceConfig(set),
new ActorComponentProviderFactory(configurators));
rc.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces))
log.info("Starting AkkaServlet with ResourceFilters: " + rc.getProperty("com.sun.jersey.spi.container.ResourceFilters"));
rc.getProperties.put("com.sun.jersey.spi.container.ResourceFilters", "org.atmosphere.core.AtmosphereFilter")
//rc.getFeatures.put("com.sun.jersey.config.feature.Redirect", true)
//rc.getFeatures.put("com.sun.jersey.config.feature.ImplicitViewables",true)
wa.initiate(rc, new ActorComponentProviderFactory(configurators))
}
}
// Borrowed from AbstractReflectorAtmosphereHandler
override def onMessage(event: AtmosphereEvent[HttpServletRequest, HttpServletResponse]): AtmosphereEvent[_, _] = {
if (event.getMessage ne null) {
val isUsingStream = try {
event.getResponse.getWriter
false
} catch {case e: IllegalStateException => true}
val data = event.getMessage.toString
if (isUsingStream) {
if (data != null) event.getResponse.getOutputStream.write(data.getBytes)
event.getResponse.getOutputStream.flush
} else {
event.getResponse.getWriter.write(data)
event.getResponse.getWriter.flush
}
} else log.info("Null event message :/ req[%s] res[%s]", event.getRequest, event.getResponse)
event
}
override def onEvent(event: AtmosphereEvent[HttpServletRequest, HttpServletResponse]): AtmosphereEvent[_, _] = {
event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_EVENT, event)
event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_HANDLER, this)
service(event.getRequest, event.getResponse)
event
}
}
class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet {
override def init(sconf: ServletConfig) = {
val servlet = new AkkaServlet
config = new AtmosphereConfig { ah = servlet }
atmosphereHandlers.put("", new AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
setCometSupport(new GrizzlyCometSupport(config))
getCometSupport.init(sconf)
servlet.init(sconf)
}
override def loadAtmosphereDotXml(is: InputStream, urlc: URLClassLoader) = () //Hide it
}

View file

@ -50,6 +50,7 @@ class DispatcherFactory {
* Has a fluent builder interface for configuring its semantics.
*/
def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true)
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.

View file

@ -56,11 +56,12 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List}
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase {
def this() = this(false)
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
private var threadPoolBuilder: ThreadPoolExecutor = _
@ -117,7 +118,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
if (CONCURRENT_MODE) {
if (concurrentMode) {
val invoker = messageHandlers.get(invocation.sender)
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
@ -135,7 +136,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
}
private def free(invoker: AnyRef) = guard.synchronized {
if (!CONCURRENT_MODE) busyInvokers.remove(invoker)
if (!concurrentMode) busyInvokers.remove(invoker)
}
// ============ Code for configuration of thread pool =============

View file

@ -9,9 +9,8 @@ import java.util.concurrent.TimeUnit
import java.util.HashMap
trait MessageDispatcherBase extends MessageDispatcher {
val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS
val queue = new ReactiveMessageQueue
@volatile protected var active: Boolean = false

View file

@ -41,8 +41,6 @@ class MessageInvocation(val sender: AnyRef,
var result = HashCode.SEED
result = HashCode.hash(result, sender)
result = HashCode.hash(result, message)
result = if (future.isDefined) HashCode.hash(result, future.get) else result
result = if (tx.isDefined) HashCode.hash(result, tx.get.id) else result
result
}
@ -50,11 +48,7 @@ class MessageInvocation(val sender: AnyRef,
that != null &&
that.isInstanceOf[MessageInvocation] &&
that.asInstanceOf[MessageInvocation].sender == sender &&
that.asInstanceOf[MessageInvocation].message == message &&
that.asInstanceOf[MessageInvocation].future.isDefined == future.isDefined &&
that.asInstanceOf[MessageInvocation].future.get == future.get &&
that.asInstanceOf[MessageInvocation].tx.isDefined == tx.isDefined &&
that.asInstanceOf[MessageInvocation].tx.get.id == tx.get.id
that.asInstanceOf[MessageInvocation].message == message
override def toString(): String = "MessageInvocation[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]"
}

View file

@ -44,9 +44,10 @@ object CassandraStorage extends Logging {
kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
//case "sbinary" => Serializer.SBinary
case "protobuf" => Serializer.Protobuf
case "java" => Serializer.Java
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported")
case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}

View file

@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)