diff --git a/akka-actor/src/main/scala/akka/actor/Implicits.scala b/akka-actor/src/main/scala/akka/actor/Implicits.scala index 668d2d8876..a2d6386d97 100644 --- a/akka-actor/src/main/scala/akka/actor/Implicits.scala +++ b/akka-actor/src/main/scala/akka/actor/Implicits.scala @@ -16,7 +16,5 @@ package object actor { type Uuid = com.eaio.uuid.UUID def newUuid(): Uuid = new Uuid() def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode) - def uuidFrom(uuid: String): Uuid = { - new Uuid(uuid) - } + def uuidFrom(uuid: String): Uuid = new Uuid(uuid) } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala similarity index 66% rename from akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala rename to akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 9becd42e48..5d09a1715f 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -24,22 +24,26 @@ object DataFlow { class DataFlowVariableException(msg: String) extends AkkaException(msg) - /** Executes the supplied thunk in another thread + /** + * Executes the supplied thunk in another thread. */ def thread(body: => Unit): Unit = spawn(body) - /** Executes the supplied Effect in another thread - * JavaAPI + /** + * JavaAPI. + * Executes the supplied Effect in another thread. */ def thread(body: Effect): Unit = spawn(body.apply) - /** Executes the supplied function in another thread + /** + * Executes the supplied function in another thread. */ def thread[A <: AnyRef, R <: AnyRef](body: A => R) = actorOf(new ReactiveEventBasedThread(body)).start - /** Executes the supplied Function in another thread - * JavaAPI + /** + * JavaAPI. + * Executes the supplied Function in another thread. */ def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) = actorOf(new ReactiveEventBasedThread(body.apply)).start @@ -97,7 +101,8 @@ object DataFlow { private[this] val in = actorOf(new In(this)).start - /** Sets the value of this variable (if unset) with the value of the supplied variable + /** + * Sets the value of this variable (if unset) with the value of the supplied variable. */ def <<(ref: DataFlowVariable[T]) { if (this.value.get.isEmpty) in ! Set(ref()) @@ -105,12 +110,14 @@ object DataFlow { "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") } - /** Sets the value of this variable (if unset) with the value of the supplied variable - * JavaAPI + /** + * JavaAPI. + * Sets the value of this variable (if unset) with the value of the supplied variable. */ def set(ref: DataFlowVariable[T]) { this << ref } - /** Sets the value of this variable (if unset) + /** + * Sets the value of this variable (if unset). */ def <<(value: T) { if (this.value.get.isEmpty) in ! Set(value) @@ -118,18 +125,19 @@ object DataFlow { "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") } - /** Sets the value of this variable (if unset) with the value of the supplied variable - * JavaAPI + /** + * JavaAPI. + * Sets the value of this variable (if unset) with the value of the supplied variable. */ def set(value: T) { this << value } - /** Retrieves the value of variable - * throws a DataFlowVariableException if it times out + /** + * Retrieves the value of variable, throws a DataFlowVariableException if it times out. */ def get(): T = this() - /** Retrieves the value of variable - * throws a DataFlowVariableException if it times out + /** + * Retrieves the value of variable, throws a DataFlowVariableException if it times out. */ def apply(): T = { value.get getOrElse { @@ -144,52 +152,11 @@ object DataFlow { throw e } - result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) + result.getOrElse(throw new DataFlowVariableException( + "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) } } def shutdown = in ! Exit } - - /** - * @author Jonas Bonér - */ - class DataFlowStream[T <: Any] extends Seq[T] { - private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] - - def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) - - def <<<(value: T) = { - val ref = new DataFlowVariable[T] - ref << value - queue.offer(ref) - } - - def apply(): T = { - val ref = queue.take - val result = ref() - ref.shutdown - result - } - - def take: DataFlowVariable[T] = queue.take - - //==== For Seq ==== - - def length: Int = queue.size - - def apply(i: Int): T = { - if (i == 0) apply() - else throw new UnsupportedOperationException( - "Access by index other than '0' is not supported by DataFlowStream") - } - - def iterator: Iterator[T] = new Iterator[T] { - private val iter = queue.iterator - def hasNext: Boolean = iter.hasNext - def next: T = { val ref = iter.next; ref() } - } - - override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] - } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 1b61195bc6..28bacbe34f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -120,7 +120,7 @@ class ExecutorBasedEventDrivenDispatcher( /** * Creates and returns a durable mailbox for the given actor. */ - private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = createMailbox(mailboxType.mailboxImplClassname, actorRef) private[akka] def start = log.slf4j.debug("Starting up {}\n\twith throughput [{}]", this, throughput) diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 7ced9f8f0e..284512db82 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -224,7 +224,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( /** * Creates and returns a durable mailbox for the given actor. */ - private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = + private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = createMailbox(mailboxType.mailboxImplClassname, actorRef) private[akka] override def register(actorRef: ActorRef) = { diff --git a/akka-actor/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/akka/dataflow/DataFlowSpec.scala index f787060fc8..d3b2b7d57a 100644 --- a/akka-actor/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ b/akka-actor/src/test/scala/akka/dataflow/DataFlowSpec.scala @@ -69,8 +69,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { result.get should equal (sum(0,ints(0,1000))) List(x,y,z).foreach(_.shutdown) } - - /*it("should be able to join streams") { +/* + it("should be able to join streams") { import DataFlow._ ActorRegistry.shutdownAll @@ -133,8 +133,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { thread { recurseSum(consumer) } latch.await(15,TimeUnit.SECONDS) should equal (true) - }*/ - + } +*/ /* Test not ready for prime time, causes some sort of deadlock */ /* it("should be able to conditionally set variables") { diff --git a/akka-http/src/main/scala/JettyContinuation.scala b/akka-http/src/main/scala/JettyContinuation.scala index 3d53548352..f6d1493842 100644 --- a/akka-http/src/main/scala/JettyContinuation.scala +++ b/akka-http/src/main/scala/JettyContinuation.scala @@ -84,7 +84,7 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging case None => false case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) ne null)) } - + def timeout(ms:Long):Boolean = _continuation match { case None => false case Some(continuation) => diff --git a/akka-http/src/main/scala/Mist.scala b/akka-http/src/main/scala/Mist.scala index b3296a5202..3cc5faa40b 100644 --- a/akka-http/src/main/scala/Mist.scala +++ b/akka-http/src/main/scala/Mist.scala @@ -5,9 +5,10 @@ package akka.http import akka.util.Logging -import javax.servlet.http.HttpServlet -import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import akka.actor.{ActorRegistry, ActorRef, Actor} + +import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import javax.servlet.http.HttpServlet import javax.servlet.Filter /** @@ -314,7 +315,7 @@ class RootEndpoint extends Actor with Endpoint { /** * Basic description of the suspended async http request. - * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) + * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) * * @author Garrick Evans */ @@ -359,8 +360,8 @@ trait RequestMethod extends Logging def getHeaderOrElse(name: String, default: Function[Any, String]): String = request.getHeader(name) match { case null => default(null) - case s => s - } + case s => s + } def getParameterOrElse(name: String, default: Function[Any, String]): String = request.getParameter(name) match { diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index a6a4a9c5e7..70b6154fda 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -22,7 +22,7 @@ trait BootableRemoteActorService extends Bootable with Logging { else RemoteNode.start } }, "Akka Remote Service") - + def startRemoteService = remoteServerThread.start abstract override def onLoad = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala index d19d4eeeb4..8cc87a521f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala @@ -290,15 +290,15 @@ class RemoteClient private[akka] ( else None send(createRemoteMessageProtocolBuilder( Some(actorRef), - Left(actorRef.uuid), - actorRef.id, - actorRef.actorClassName, - actorRef.timeout, - Left(message), - isOneWay, - senderOption, - typedActorInfo, - actorType, + Left(actorRef.uuid), + actorRef.id, + actorRef.actorClassName, + actorRef.timeout, + Left(message), + isOneWay, + senderOption, + typedActorInfo, + actorType, cookie ).build, senderFuture) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala index 71a306f743..e0c51ee77b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteServer.scala @@ -161,16 +161,16 @@ case class RemoteServerStarted( case class RemoteServerShutdown( @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent case class RemoteServerError( - @BeanProperty val cause: Throwable, + @BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent case class RemoteServerClientConnected( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServer, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientDisconnected( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServer, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent case class RemoteServerClientClosed( - @BeanProperty val server: RemoteServer, + @BeanProperty val server: RemoteServer, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent /** @@ -726,7 +726,7 @@ class RemoteServerHandler( } private def findActorFactory(id: String) : () => ActorRef = { - server.actorsFactories.get(id) + server.actorsFactories.get(id) } private def findSessionActor(id: String, channel: Channel) : ActorRef = { @@ -840,7 +840,7 @@ class RemoteServerHandler( { // the actor has not been registered globally. See if we have it in the session val sessionActorRefOrNull = createSessionActor(actorInfo, channel) - if (sessionActorRefOrNull ne null) + if (sessionActorRefOrNull ne null) sessionActorRefOrNull else // maybe it is a client managed actor createClientManagedActor(actorInfo) @@ -863,7 +863,7 @@ class RemoteServerHandler( newInstance } else - null + null } } @@ -921,15 +921,15 @@ class RemoteServerHandler( log.slf4j.debug("Could not invoke remote actor", exception) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( None, - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Right(exception), - true, - None, - None, - actorType, + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + Right(exception), + true, + None, + None, + actorType, None) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) messageBuilder.build diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index c0e49c80c5..5ed4d9a969 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -299,9 +299,9 @@ object RemoteActorSerialization { .setOneWay(isOneWay) message match { - case Left(message) => + case Left(message) => messageBuilder.setMessage(MessageSerializer.serialize(message)) - case Right(exception) => + case Right(exception) => messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) .setMessage(exception.getMessage) @@ -310,7 +310,7 @@ object RemoteActorSerialization { secureCookie.foreach(messageBuilder.setCookie(_)) - actorRef.foreach { ref => + actorRef.foreach { ref => ref.registerSupervisorAsRemoteActor.foreach { id => messageBuilder.setSupervisorUuid( UuidProtocol.newBuilder diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 5881687c01..14387e7909 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -4,7 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} import akka.util._ - + import akka.remote.{RemoteServer, RemoteClient} import akka.actor.Actor._ import akka.actor.{ActorRegistry, ActorRef, Actor} diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index 173898c1fe..982fc3e642 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -42,7 +42,7 @@ object ServerInitiatedRemoteSessionActorSpec { def receive = { case Login(user) => - this.user = user + this.user = user case GetUser() => self.reply(this.user) case DoSomethingFunny() => diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala index 0ae4ca2dee..0308ea53b3 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedSessionActorSpec.scala @@ -98,7 +98,7 @@ class ServerInitiatedRemoteTypedSessionActorSpec extends it should "be able to unregister" in { server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000)) - + server.typedActorsFactories.get("my-service-1") should not be (null) server.unregisterTypedPerSessionActor("my-service-1") server.typedActorsFactories.get("my-service-1") should be (null) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 58cfaf185a..a0b238a832 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -111,7 +111,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SLF4J_VERSION = "1.6.0" lazy val JETTY_VERSION = "7.1.6.v20100715" lazy val JAVAX_SERVLET_VERSION = "3.0" - + // ------------------------------------------------------------------------------------------------------------------- // Dependencies @@ -138,7 +138,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 - + lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license @@ -154,7 +154,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 - + lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1