Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2010-11-25 10:17:02 +01:00
commit 3362dab215
15 changed files with 75 additions and 109 deletions

View file

@ -16,7 +16,5 @@ package object actor {
type Uuid = com.eaio.uuid.UUID
def newUuid(): Uuid = new Uuid()
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
def uuidFrom(uuid: String): Uuid = {
new Uuid(uuid)
}
def uuidFrom(uuid: String): Uuid = new Uuid(uuid)
}

View file

@ -24,22 +24,26 @@ object DataFlow {
class DataFlowVariableException(msg: String) extends AkkaException(msg)
/** Executes the supplied thunk in another thread
/**
* Executes the supplied thunk in another thread.
*/
def thread(body: => Unit): Unit = spawn(body)
/** Executes the supplied Effect in another thread
* JavaAPI
/**
* JavaAPI.
* Executes the supplied Effect in another thread.
*/
def thread(body: Effect): Unit = spawn(body.apply)
/** Executes the supplied function in another thread
/**
* Executes the supplied function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
actorOf(new ReactiveEventBasedThread(body)).start
/** Executes the supplied Function in another thread
* JavaAPI
/**
* JavaAPI.
* Executes the supplied Function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) =
actorOf(new ReactiveEventBasedThread(body.apply)).start
@ -97,7 +101,8 @@ object DataFlow {
private[this] val in = actorOf(new In(this)).start
/** Sets the value of this variable (if unset) with the value of the supplied variable
/**
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def <<(ref: DataFlowVariable[T]) {
if (this.value.get.isEmpty) in ! Set(ref())
@ -105,12 +110,14 @@ object DataFlow {
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
}
/** Sets the value of this variable (if unset) with the value of the supplied variable
* JavaAPI
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(ref: DataFlowVariable[T]) { this << ref }
/** Sets the value of this variable (if unset)
/**
* Sets the value of this variable (if unset).
*/
def <<(value: T) {
if (this.value.get.isEmpty) in ! Set(value)
@ -118,18 +125,19 @@ object DataFlow {
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
}
/** Sets the value of this variable (if unset) with the value of the supplied variable
* JavaAPI
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(value: T) { this << value }
/** Retrieves the value of variable
* throws a DataFlowVariableException if it times out
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def get(): T = this()
/** Retrieves the value of variable
* throws a DataFlowVariableException if it times out
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def apply(): T = {
value.get getOrElse {
@ -144,52 +152,11 @@ object DataFlow {
throw e
}
result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
result.getOrElse(throw new DataFlowVariableException(
"Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
}
}
def shutdown = in ! Exit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DataFlowStream[T <: Any] extends Seq[T] {
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
def <<<(value: T) = {
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
}
def apply(): T = {
val ref = queue.take
val result = ref()
ref.shutdown
result
}
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
else throw new UnsupportedOperationException(
"Access by index other than '0' is not supported by DataFlowStream")
}
def iterator: Iterator[T] = new Iterator[T] {
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
def next: T = { val ref = iter.next; ref() }
}
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
}

View file

@ -120,7 +120,7 @@ class ExecutorBasedEventDrivenDispatcher(
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
createMailbox(mailboxType.mailboxImplClassname, actorRef)
private[akka] def start = log.slf4j.debug("Starting up {}\n\twith throughput [{}]", this, throughput)

View file

@ -224,7 +224,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
createMailbox(mailboxType.mailboxImplClassname, actorRef)
private[akka] override def register(actorRef: ActorRef) = {

View file

@ -69,8 +69,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
result.get should equal (sum(0,ints(0,1000)))
List(x,y,z).foreach(_.shutdown)
}
/*it("should be able to join streams") {
/*
it("should be able to join streams") {
import DataFlow._
ActorRegistry.shutdownAll
@ -133,8 +133,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
thread { recurseSum(consumer) }
latch.await(15,TimeUnit.SECONDS) should equal (true)
}*/
}
*/
/* Test not ready for prime time, causes some sort of deadlock */
/* it("should be able to conditionally set variables") {

View file

@ -84,7 +84,7 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging
case None => false
case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) ne null))
}
def timeout(ms:Long):Boolean = _continuation match {
case None => false
case Some(continuation) =>

View file

@ -5,9 +5,10 @@
package akka.http
import akka.util.Logging
import javax.servlet.http.HttpServlet
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import akka.actor.{ActorRegistry, ActorRef, Actor}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import javax.servlet.http.HttpServlet
import javax.servlet.Filter
/**
@ -314,7 +315,7 @@ class RootEndpoint extends Actor with Endpoint {
/**
* Basic description of the suspended async http request.
* Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations)
* Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations)
*
* @author Garrick Evans
*/
@ -359,8 +360,8 @@ trait RequestMethod extends Logging
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
request.getHeader(name) match {
case null => default(null)
case s => s
}
case s => s
}
def getParameterOrElse(name: String, default: Function[Any, String]): String =
request.getParameter(name) match {

View file

@ -22,7 +22,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
else RemoteNode.start
}
}, "Akka Remote Service")
def startRemoteService = remoteServerThread.start
abstract override def onLoad = {

View file

@ -290,15 +290,15 @@ class RemoteClient private[akka] (
else None
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
cookie
).build, senderFuture)
}

View file

@ -161,16 +161,16 @@ case class RemoteServerStarted(
case class RemoteServerShutdown(
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
case class RemoteServerError(
@BeanProperty val cause: Throwable,
@BeanProperty val cause: Throwable,
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
case class RemoteServerClientConnected(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServer,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServer,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientClosed(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServer,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
/**
@ -726,7 +726,7 @@ class RemoteServerHandler(
}
private def findActorFactory(id: String) : () => ActorRef = {
server.actorsFactories.get(id)
server.actorsFactories.get(id)
}
private def findSessionActor(id: String, channel: Channel) : ActorRef = {
@ -840,7 +840,7 @@ class RemoteServerHandler(
{
// the actor has not been registered globally. See if we have it in the session
val sessionActorRefOrNull = createSessionActor(actorInfo, channel)
if (sessionActorRefOrNull ne null)
if (sessionActorRefOrNull ne null)
sessionActorRefOrNull
else // maybe it is a client managed actor
createClientManagedActor(actorInfo)
@ -863,7 +863,7 @@ class RemoteServerHandler(
newInstance
}
else
null
null
}
}
@ -921,15 +921,15 @@ class RemoteServerHandler(
log.slf4j.debug("Could not invoke remote actor", exception)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Right(exception),
true,
None,
None,
actorType,
Right(request.getUuid),
actorInfo.getId,
actorInfo.getTarget,
actorInfo.getTimeout,
Right(exception),
true,
None,
None,
actorType,
None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
messageBuilder.build

View file

@ -299,9 +299,9 @@ object RemoteActorSerialization {
.setOneWay(isOneWay)
message match {
case Left(message) =>
case Left(message) =>
messageBuilder.setMessage(MessageSerializer.serialize(message))
case Right(exception) =>
case Right(exception) =>
messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName)
.setMessage(exception.getMessage)
@ -310,7 +310,7 @@ object RemoteActorSerialization {
secureCookie.foreach(messageBuilder.setCookie(_))
actorRef.foreach { ref =>
actorRef.foreach { ref =>
ref.registerSupervisorAsRemoteActor.foreach { id =>
messageBuilder.setSupervisorUuid(
UuidProtocol.newBuilder

View file

@ -4,7 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import akka.util._
import akka.remote.{RemoteServer, RemoteClient}
import akka.actor.Actor._
import akka.actor.{ActorRegistry, ActorRef, Actor}

View file

@ -42,7 +42,7 @@ object ServerInitiatedRemoteSessionActorSpec {
def receive = {
case Login(user) =>
this.user = user
this.user = user
case GetUser() =>
self.reply(this.user)
case DoSomethingFunny() =>

View file

@ -98,7 +98,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends
it should "be able to unregister" in {
server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
server.typedActorsFactories.get("my-service-1") should not be (null)
server.unregisterTypedPerSessionActor("my-service-1")
server.typedActorsFactories.get("my-service-1") should be (null)

View file

@ -111,7 +111,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val SLF4J_VERSION = "1.6.0"
lazy val JETTY_VERSION = "7.1.6.v20100715"
lazy val JAVAX_SERVLET_VERSION = "3.0"
// -------------------------------------------------------------------------------------------------------------------
// Dependencies
@ -138,7 +138,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" //Eclipse license
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license
@ -154,7 +154,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1