hacking in servlet 3.0 support using embedded jetty-8 (remove atmo, hbase, volde to get around jar mismatch); wip
This commit is contained in:
parent
fece2f1990
commit
b53ec3c980
6 changed files with 83 additions and 143 deletions
|
|
@ -88,6 +88,7 @@ class AkkaHttpServlet extends HttpServlet with Logging
|
|||
|
||||
case (3,0) => {
|
||||
log.info("Supporting Java asynchronous contexts.")
|
||||
_factory = Some(Servlet30ContextMethodFactory)
|
||||
}
|
||||
|
||||
case _ if (server.toLowerCase startsWith JettyServer) => {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import Types._
|
|||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait JettyContinuation extends ContinuationListener with akka.util.Logging
|
||||
trait JettyContinuation extends ContinuationListener with akka.util.Logging
|
||||
{
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import AkkaHttpServlet._
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ trait RequestMethod extends Logging
|
|||
}
|
||||
catch
|
||||
{
|
||||
case io:IOException => log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
|
||||
case io => log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
80
akka-http/src/main/scala/Servlet30Context.scala
Normal file
80
akka-http/src/main/scala/Servlet30Context.scala
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait Servlet30Context extends AsyncListener with akka.util.Logging
|
||||
{
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import AkkaHttpServlet._
|
||||
|
||||
val builder:()=>tAsyncRequestContext
|
||||
val context:Option[tAsyncRequestContext] = Some(builder())
|
||||
def go = {context.isDefined}
|
||||
|
||||
protected val _ac:AsyncContext = {
|
||||
val ac = context.get.asInstanceOf[AsyncContext]
|
||||
ac.setTimeout(DefaultTimeout)
|
||||
ac.addListener(this)
|
||||
ac
|
||||
}
|
||||
|
||||
def suspended:Boolean = true
|
||||
|
||||
def timeout(ms:Long):Boolean =
|
||||
{
|
||||
try {
|
||||
_ac.setTimeout(ms)
|
||||
true
|
||||
}
|
||||
catch {
|
||||
case ex:IllegalStateException => {
|
||||
|
||||
log.info("Cannot update timeout - already returned to container")
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// AsyncListener
|
||||
//
|
||||
|
||||
def onComplete(e:AsyncEvent) = {}
|
||||
def onError(e:AsyncEvent) =
|
||||
{
|
||||
e.getThrowable match {
|
||||
case null => log.warning("Error occured...")
|
||||
case t => log.warning(t, "Error occured")
|
||||
}
|
||||
}
|
||||
def onStartAsync(e:AsyncEvent) = {}
|
||||
def onTimeout(e:AsyncEvent) =
|
||||
{
|
||||
e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
e.getAsyncContext.complete
|
||||
}
|
||||
}
|
||||
|
||||
object Servlet30ContextMethodFactory extends RequestMethodFactory
|
||||
{
|
||||
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with Servlet30Context}
|
||||
def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with Servlet30Context}
|
||||
def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with Servlet30Context}
|
||||
def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with Servlet30Context}
|
||||
def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with Servlet30Context}
|
||||
def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with Servlet30Context}
|
||||
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with Servlet30Context}
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -1,40 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.Actor
|
||||
import akka.dispatch.Dispatchers
|
||||
import org.atmosphere.jersey.util.JerseyBroadcasterUtil
|
||||
|
||||
object AkkaBroadcaster {
|
||||
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
|
||||
|
||||
type Event = AtmosphereResourceEvent[_,_]
|
||||
type Resource = AtmosphereResource[_,_]
|
||||
}
|
||||
|
||||
class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster {
|
||||
import AkkaBroadcaster._
|
||||
|
||||
//FIXME should be supervised
|
||||
lazy val caster = actorOf(new Actor {
|
||||
self.dispatcher = broadcasterDispatcher
|
||||
def receive = {
|
||||
case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e)
|
||||
}
|
||||
}).start
|
||||
|
||||
override def destroy {
|
||||
super.destroy
|
||||
caster.stop
|
||||
}
|
||||
|
||||
protected override def broadcast(r: Resource, e : Event) {
|
||||
caster ! ((r,e))
|
||||
}
|
||||
}
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import akka.util.Logging
|
||||
|
||||
import java.util.{List => JList}
|
||||
import javax.servlet.{ServletConfig,ServletContext}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
|
||||
import org.atmosphere.container.GrizzlyCometSupport
|
||||
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
|
||||
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
|
||||
|
||||
class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor {
|
||||
//Delegate to implement the behavior for AtmosphereHandler
|
||||
private val handler = new AbstractReflectorAtmosphereHandler {
|
||||
override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) {
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event)
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this)
|
||||
service(event.getRequest, event.getResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) handler onStateChange event
|
||||
}
|
||||
|
||||
override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
handler onRequest resource
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
|
||||
* standard servlet container, e.g. not using the Akka Kernel.
|
||||
* <p/>
|
||||
* Used by the Akka Kernel to bootstrap REST and Comet.
|
||||
*/
|
||||
class AkkaServlet extends AtmosphereServlet {
|
||||
import akka.config.Config.{config => c}
|
||||
|
||||
/*
|
||||
* Configure Atmosphere and Jersey (default, fall-back values)
|
||||
*/
|
||||
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
|
||||
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
|
||||
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
|
||||
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
|
||||
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
|
||||
|
||||
c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) }
|
||||
c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) }
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameter(key : String) =
|
||||
Option(super.getInitParameter(key)).getOrElse(initParams get key)
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameterNames() = {
|
||||
import scala.collection.JavaConversions._
|
||||
initParams.keySet.iterator ++ super.getInitParameterNames
|
||||
}
|
||||
|
||||
/**
|
||||
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
|
||||
* Instead we specify what semantics we want in code.
|
||||
*/
|
||||
override def loadConfiguration(sc: ServletConfig) {
|
||||
config.setSupportSession(false)
|
||||
isBroadcasterSpecified = true
|
||||
|
||||
//The bridge between Atmosphere and Jersey
|
||||
val servlet = new AtmosphereRestServlet {
|
||||
//These are needed to make sure that Jersey is reading the config from the outer servlet
|
||||
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
|
||||
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
|
||||
}
|
||||
|
||||
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
|
||||
}
|
||||
|
||||
override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) {
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
lazy val desiredCometSupport =
|
||||
Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport
|
||||
|
||||
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] =
|
||||
desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault))
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue