diff --git a/akka-http/src/main/scala/AkkaHttpServlet.scala b/akka-http/src/main/scala/AkkaHttpServlet.scala index 61f2070ab0..12f48addfc 100644 --- a/akka-http/src/main/scala/AkkaHttpServlet.scala +++ b/akka-http/src/main/scala/AkkaHttpServlet.scala @@ -88,6 +88,7 @@ class AkkaHttpServlet extends HttpServlet with Logging case (3,0) => { log.info("Supporting Java asynchronous contexts.") + _factory = Some(Servlet30ContextMethodFactory) } case _ if (server.toLowerCase startsWith JettyServer) => { diff --git a/akka-http/src/main/scala/JettyContinuation.scala b/akka-http/src/main/scala/JettyContinuation.scala index 6c753382f2..ff0355a82a 100644 --- a/akka-http/src/main/scala/JettyContinuation.scala +++ b/akka-http/src/main/scala/JettyContinuation.scala @@ -14,7 +14,7 @@ import Types._ /** * @author Garrick Evans */ -trait JettyContinuation extends ContinuationListener with akka.util.Logging +trait JettyContinuation extends ContinuationListener with akka.util.Logging { import javax.servlet.http.HttpServletResponse import AkkaHttpServlet._ diff --git a/akka-http/src/main/scala/RequestMethod.scala b/akka-http/src/main/scala/RequestMethod.scala index ba86336521..e475d86714 100644 --- a/akka-http/src/main/scala/RequestMethod.scala +++ b/akka-http/src/main/scala/RequestMethod.scala @@ -101,7 +101,7 @@ trait RequestMethod extends Logging } catch { - case io:IOException => log.error(io, "Failed to write data to connection on resume - the client probably disconnected") + case io => log.error(io, "Failed to write data to connection on resume - the client probably disconnected") } } diff --git a/akka-http/src/main/scala/Servlet30Context.scala b/akka-http/src/main/scala/Servlet30Context.scala new file mode 100644 index 0000000000..5d48118609 --- /dev/null +++ b/akka-http/src/main/scala/Servlet30Context.scala @@ -0,0 +1,80 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + +import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent}; +import Types._ + + +/** + * @author Garrick Evans + */ +trait Servlet30Context extends AsyncListener with akka.util.Logging +{ + import javax.servlet.http.HttpServletResponse + import AkkaHttpServlet._ + + val builder:()=>tAsyncRequestContext + val context:Option[tAsyncRequestContext] = Some(builder()) + def go = {context.isDefined} + + protected val _ac:AsyncContext = { + val ac = context.get.asInstanceOf[AsyncContext] + ac.setTimeout(DefaultTimeout) + ac.addListener(this) + ac + } + + def suspended:Boolean = true + + def timeout(ms:Long):Boolean = + { + try { + _ac.setTimeout(ms) + true + } + catch { + case ex:IllegalStateException => { + + log.info("Cannot update timeout - already returned to container") + false + } + } + } + + // + // AsyncListener + // + + def onComplete(e:AsyncEvent) = {} + def onError(e:AsyncEvent) = + { + e.getThrowable match { + case null => log.warning("Error occured...") + case t => log.warning(t, "Error occured") + } + } + def onStartAsync(e:AsyncEvent) = {} + def onTimeout(e:AsyncEvent) = + { + e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue) + e.getAsyncContext.complete + } +} + +object Servlet30ContextMethodFactory extends RequestMethodFactory +{ + def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with Servlet30Context} + def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with Servlet30Context} + def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with Servlet30Context} + def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with Servlet30Context} + def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with Servlet30Context} + def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with Servlet30Context} + def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with Servlet30Context} +} + + diff --git a/akka-http/src/main/scala/akka/AkkaBroadcaster.scala b/akka-http/src/main/scala/akka/AkkaBroadcaster.scala deleted file mode 100644 index fd0f76631a..0000000000 --- a/akka-http/src/main/scala/akka/AkkaBroadcaster.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.comet - -import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource} - -import akka.actor.Actor._ -import akka.actor.Actor -import akka.dispatch.Dispatchers -import org.atmosphere.jersey.util.JerseyBroadcasterUtil - -object AkkaBroadcaster { - val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") - - type Event = AtmosphereResourceEvent[_,_] - type Resource = AtmosphereResource[_,_] -} - -class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster { - import AkkaBroadcaster._ - - //FIXME should be supervised - lazy val caster = actorOf(new Actor { - self.dispatcher = broadcasterDispatcher - def receive = { - case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e) - } - }).start - - override def destroy { - super.destroy - caster.stop - } - - protected override def broadcast(r: Resource, e : Event) { - caster ! ((r,e)) - } -} diff --git a/akka-http/src/main/scala/akka/AkkaCometServlet.scala b/akka-http/src/main/scala/akka/AkkaCometServlet.scala deleted file mode 100644 index 5b15096c92..0000000000 --- a/akka-http/src/main/scala/akka/AkkaCometServlet.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.comet - -import akka.util.Logging - -import java.util.{List => JList} -import javax.servlet.{ServletConfig,ServletContext} -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import com.sun.jersey.spi.container.servlet.ServletContainer - -import org.atmosphere.container.GrizzlyCometSupport -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} -import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} - -class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor { - //Delegate to implement the behavior for AtmosphereHandler - private val handler = new AbstractReflectorAtmosphereHandler { - override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { - if (event ne null) { - event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event) - event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this) - service(event.getRequest, event.getResponse) - } - } - } - - override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) { - if (event ne null) handler onStateChange event - } - - override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { - handler onRequest resource - } - } - -/** - * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a - * standard servlet container, e.g. not using the Akka Kernel. - *

- * Used by the Akka Kernel to bootstrap REST and Comet. - */ -class AkkaServlet extends AtmosphereServlet { - import akka.config.Config.{config => c} - - /* - * Configure Atmosphere and Jersey (default, fall-back values) - */ - addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true") - addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName) - addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true") - addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";")) - addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(",")) - - c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) } - c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) } - - /* - * Provide a fallback for default values - */ - override def getInitParameter(key : String) = - Option(super.getInitParameter(key)).getOrElse(initParams get key) - - /* - * Provide a fallback for default values - */ - override def getInitParameterNames() = { - import scala.collection.JavaConversions._ - initParams.keySet.iterator ++ super.getInitParameterNames - } - - /** - * We override this to avoid Atmosphere looking for it's atmosphere.xml file - * Instead we specify what semantics we want in code. - */ - override def loadConfiguration(sc: ServletConfig) { - config.setSupportSession(false) - isBroadcasterSpecified = true - - //The bridge between Atmosphere and Jersey - val servlet = new AtmosphereRestServlet { - //These are needed to make sure that Jersey is reading the config from the outer servlet - override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key) - override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames() - } - - addAtmosphereHandler("/*", servlet, new AkkaBroadcaster) - } - - override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) { - import scala.collection.JavaConversions._ - - lazy val desiredCometSupport = - Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport - - override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] = - desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault)) - } -}