Completed refactoring into lightweight modules akka-actor akka-typed-actor and akka-remote

This commit is contained in:
Jonas Bonér 2010-08-28 16:48:27 +02:00
parent adaf5d4065
commit da91cac150
238 changed files with 914 additions and 6284 deletions

View file

@ -502,6 +502,22 @@ private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
def asSilently[T: Manifest]: Option[T] = narrowSilently[T](anyOption)
}
/**
* Marker interface for proxyable actors (such as typed actor).
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Proxyable {
def swapProxiedActor(newInstance: Actor)
private[actor] def swapProxiedActor(newInstance: Actor)
}
/**
* Represents the different Actor types.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed trait ActorType
object ActorType {
case object ScalaActor extends ActorType
case object TypedActor extends ActorType
}

View file

@ -11,9 +11,9 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException}
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
import se.scalablesolutions.akka.remote.{RemoteClientModule, RemoteServerModule}
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util._
import ReflectiveAccess._
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
@ -743,7 +743,7 @@ class LocalActorRef private[akka](
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit = {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
else throw new ActorInitializationException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
@ -753,7 +753,7 @@ class LocalActorRef private[akka](
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = guard.withGuard {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
if (!isRunning || isBeingRestarted) {
_remoteAddress = Some(address)
RemoteClientModule.register(address, uuid)
@ -829,10 +829,12 @@ class LocalActorRef private[akka](
_isShutDown = true
actor.shutdown
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
remoteAddress.foreach { address =>
RemoteClientModule.unregister(address, uuid)
}
RemoteClientModule.unregister(this)
RemoteServerModule.unregister(this)
}
nullOutActorRefReferencesFor(actorInstance.get)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
@ -887,7 +889,7 @@ class LocalActorRef private[akka](
* To be invoked from within the actor itself.
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withGuard {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
try {
actorRef.makeRemote(hostname, port)
actorRef.start
@ -913,7 +915,7 @@ class LocalActorRef private[akka](
* To be invoked from within the actor itself.
*/
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
val actor = spawnButDoNotStart(clazz)
actor.makeRemote(hostname, port)
actor.start
@ -941,7 +943,7 @@ class LocalActorRef private[akka](
* To be invoked from within the actor itself.
*/
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
val actor = spawnButDoNotStart(clazz)
try {
actor.makeRemote(hostname, port)
@ -978,8 +980,10 @@ class LocalActorRef private[akka](
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
joinTransaction(message)
if (remoteAddress.isDefined) RemoteClientModule.send(message, senderOption, None, remoteAddress.get, this)
else {
if (isRemotingEnabled && remoteAddress.isDefined) {
RemoteClientModule.send[Any](
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
} else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
invocation.send
}
@ -992,9 +996,12 @@ class LocalActorRef private[akka](
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message)
if (remoteAddress.isDefined) RemoteClientModule.send(
message, senderOption, senderFuture, remoteAddress.get, this)
else {
if (isRemotingEnabled && remoteAddress.isDefined) {
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
@ -1096,7 +1103,7 @@ class LocalActorRef private[akka](
}
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
if (_supervisor.isDefined) {
remoteAddress.foreach(address => RemoteClientModule.registerSupervisorForActor(address, this))
Some(_supervisor.get.uuid)
@ -1358,7 +1365,7 @@ private[akka] case class RemoteActorRef private[akka] (
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
RemoteClientModule.ensureRemotingEnabled
ensureRemotingEnabled
_uuid = uuuid
timeout = _timeout
@ -1367,14 +1374,16 @@ private[akka] case class RemoteActorRef private[akka] (
lazy val remoteClient = RemoteClientModule.clientFor(hostname, port, loader)
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send(message, senderOption, None, remoteAddress.get, this)
RemoteClientModule.send[Any](
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send(message, senderOption, None, remoteAddress.get, this)
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
@ -1397,6 +1406,8 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = None
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
// ==== NOT SUPPORTED ====
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
@ -1407,7 +1418,6 @@ private[akka] case class RemoteActorRef private[akka] (
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
def remoteAddress: Option[InetSocketAddress] = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
@ -1460,6 +1470,7 @@ trait ActorRefShared {
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/**
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
@ -1469,6 +1480,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* upon restart, remote restart etc.
*/
def id: String
def id_=(id: String): Unit
/**
@ -1580,7 +1592,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
val isMessageJoinPoint = TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future)
val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future)
else false
try {
future.await
} catch {
@ -1664,8 +1677,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/**
* Atomically create (from actor class), start and make an actor remote.
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
ensureRemotingEnabled
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
}
/**
@ -1674,10 +1689,11 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
def spawnLink[T <: Actor: Manifest]: ActorRef =
spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Atomically create (from actor class), start, link and make an actor remote.
*/
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef =
def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = {
ensureRemotingEnabled
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]],hostname,port)
}
}

View file

@ -12,6 +12,11 @@ import java.util.{Set=>JSet}
import se.scalablesolutions.akka.util.ListenerManagement
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent

View file

@ -0,0 +1,101 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
import java.util.Enumeration
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.config.Config._
class AkkaDeployClassLoader(urls : List[URL], parent : ClassLoader) extends URLClassLoader(urls.toArray.asInstanceOf[Array[URL]],parent)
{
override def findResources(resource : String) = {
val normalResult = super.findResources(resource)
if(normalResult.hasMoreElements) normalResult else findDeployed(resource)
}
def findDeployed(resource : String) = new Enumeration[URL]{
private val it = getURLs.flatMap( listClassesInPackage(_,resource) ).iterator
def hasMoreElements = it.hasNext
def nextElement = it.next
}
def listClassesInPackage(jar : URL, pkg : String) = {
val f = new File(jar.getFile)
val jf = new JarFile(f)
try {
val es = jf.entries
var result = List[URL]()
while(es.hasMoreElements)
{
val e = es.nextElement
if(!e.isDirectory && e.getName.startsWith(pkg) && e.getName.endsWith(".class"))
result ::= new URL("jar:" + f.toURI.toURL + "!/" + e)
}
result
} finally {
jf.close
}
}
}
/**
* Handles all modules in the deploy directory (load and unload)
*/
trait BootableActorLoaderService extends Bootable with Logging {
val BOOT_CLASSES = config.getList("akka.boot")
lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader
protected def createApplicationClassLoader : Option[ClassLoader] = {
Some(
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) {
log.error("Could not find a deploy directory at [%s]", DEPLOY)
System.exit(-1)
}
val filesToDeploy = DEPLOY_DIR.listFiles.toArray.toList
.asInstanceOf[List[File]].filter(_.getName.endsWith(".jar"))
var dependencyJars: List[URL] = Nil
filesToDeploy.map { file =>
val jarFile = new JarFile(file)
val en = jarFile.entries
while (en.hasMoreElements) {
val name = en.nextElement.getName
if (name.endsWith(".jar")) dependencyJars ::= new File(
String.format("jar:file:%s!/%s", jarFile.getName, name)).toURI.toURL
}
}
val toDeploy = filesToDeploy.map(_.toURI.toURL)
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy)
log.debug("Loading dependencies [%s]", dependencyJars)
val allJars = toDeploy ::: dependencyJars
new AkkaDeployClassLoader(allJars,Thread.currentThread.getContextClassLoader)
} else Thread.currentThread.getContextClassLoader)
}
abstract override def onLoad = {
applicationLoader.foreach(_ => log.info("Creating /deploy class-loader"))
super.onLoad
for (loader <- applicationLoader; clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
}
abstract override def onUnload = {
super.onUnload
ActorRegistry.shutdownAll
}
}

View file

@ -1,11 +1,15 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm.Ref
import se.scalablesolutions.akka.stm.local._
import java.util.concurrent.{ScheduledFuture, TimeUnit}
trait FSM[S] {
this: Actor =>
trait FSM[S] { this: Actor =>
type StateFunction = scala.PartialFunction[Event, State]
@ -20,7 +24,6 @@ trait FSM[S] {
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
}
override final protected def receive: Receive = {
case value => {
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}

View file

@ -16,6 +16,7 @@
package se.scalablesolutions.akka.actor
import scala.collection.JavaConversions
import java.util.concurrent._
import se.scalablesolutions.akka.util.Logging

View file

@ -6,9 +6,9 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.remote.RemoteServerModule
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util._
import ReflectiveAccess._
import Actor._
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}

View file

@ -0,0 +1,193 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException, ActorType}
import se.scalablesolutions.akka.dispatch.{Future, CompletableFuture}
import se.scalablesolutions.akka.config.{Config, ModuleNotAvailableException}
import java.net.InetSocketAddress
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ReflectiveAccess {
val loader = getClass.getClassLoader
lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled
lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
/**
* Reflective access to the RemoteClient module.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClientModule {
type RemoteClient = {
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[_]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]]
def registerSupervisorForActor(actorRef: ActorRef)
}
type RemoteClientObject = {
def register(hostname: String, port: Int, uuid: String): Unit
def unregister(hostname: String, port: Int, uuid: String): Unit
def clientFor(address: InetSocketAddress): RemoteClient
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
}
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] = {
try {
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteClient$")
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteClientObject])
} catch { case e => None }
}
def register(address: InetSocketAddress, uuid: String) = {
ensureRemotingEnabled
remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid)
}
def unregister(address: InetSocketAddress, uuid: String) = {
ensureRemotingEnabled
remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid)
}
def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled
val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress)
remoteClient.registerSupervisorForActor(actorRef)
}
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = {
ensureRemotingEnabled
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
}
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[_]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = {
ensureRemotingEnabled
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
}
}
/**
* Reflective access to the RemoteServer module.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServerModule {
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 9999)
type RemoteServerObject = {
def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef): Unit
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
}
type RemoteNodeObject = {
def unregister(actorRef: ActorRef): Unit
}
val remoteServerObjectInstance: Option[RemoteServerObject] = {
try {
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteServer$")
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteServerObject])
} catch { case e => None }
}
val remoteNodeObjectInstance: Option[RemoteNodeObject] = {
try {
val clazz = loader.loadClass("se.scalablesolutions.akka.remote.RemoteNode$")
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteNodeObject])
} catch { case e => None }
}
def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = {
ensureRemotingEnabled
remoteServerObjectInstance.get.registerActor(address, uuid, actorRef)
}
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
ensureRemotingEnabled
remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy)
}
def unregister(actorRef: ActorRef) = {
ensureRemotingEnabled
remoteNodeObjectInstance.get.unregister(actorRef)
}
}
/**
* Reflective access to the TypedActors module.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TypedActorModule {
type TypedActorObject = {
def isJoinPoint(message: Any): Boolean
def isJoinPointAndOneWay(message: Any): Boolean
}
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] = {
try {
val clazz = loader.loadClass("se.scalablesolutions.akka.actor.TypedActor$")
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[TypedActorObject])
} catch { case e => None }
}
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
}
typedActorObjectInstance.get.isJoinPoint(message)
}
}
}

View file

@ -0,0 +1,13 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka
abstract class TestMessage
case object Ping extends TestMessage
case object Pong extends TestMessage
case object OneWay extends TestMessage
case object Die extends TestMessage
case object NotifySupervisorExit extends TestMessage

View file

@ -22,7 +22,7 @@ import se.scalablesolutions.akka.actor.ActorRegistry
@RunWith(classOf[JUnitRunner])
class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
describe("DataflowVariable") {
it("should work and generate correct results") {
/* it("should work and generate correct results") {
import DataFlow._
val latch = new CountDownLatch(1)
@ -73,8 +73,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
result.get should equal (sum(0,ints(0,1000)))
ActorRegistry.shutdownAll
}
*/
}
/*it("should be able to join streams") {
import DataFlow._

View file

@ -1,173 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.dataflow
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.concurrent.{TimeUnit, CountDownLatch}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
import scala.annotation.tailrec
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
import se.scalablesolutions.akka.actor.ActorRegistry
@RunWith(classOf[JUnitRunner])
class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
describe("DataflowVariable") {
it("should work and generate correct results") {
import DataFlow._
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
val x, y, z = new DataFlowVariable[Int]
thread {
z << x() + y()
latch.countDown
result.set(z())
}
thread { x << 40 }
thread { y << 2 }
latch.await(3,TimeUnit.SECONDS) should equal (true)
List(x,y,z).foreach(_.shutdown)
result.get should equal (42)
ActorRegistry.shutdownAll
}
it("should be able to transform a stream") {
import DataFlow._
def ints(n: Int, max: Int): List[Int] =
if (n == max) Nil
else n :: ints(n + 1, max)
def sum(s: Int, stream: List[Int]): List[Int] = stream match {
case Nil => s :: Nil
case h :: t => s :: sum(h + s, t)
}
val latch = new CountDownLatch(1)
val result = new AtomicReference[List[Int]](Nil)
val x = new DataFlowVariable[List[Int]]
val y = new DataFlowVariable[List[Int]]
val z = new DataFlowVariable[List[Int]]
thread { x << ints(0, 1000) }
thread { y << sum(0, x()) }
thread { z << y()
result.set(z())
latch.countDown
}
latch.await(3,TimeUnit.SECONDS) should equal (true)
List(x,y,z).foreach(_.shutdown)
result.get should equal (sum(0,ints(0,1000)))
ActorRegistry.shutdownAll
}
}
/*it("should be able to join streams") {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
thread { ints(0, 1000, producer) }
thread {
Thread.sleep(1000)
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
latch.countDown
}
latch.await(3,TimeUnit.SECONDS) should equal (true)
result.get should equal (332833500)
ActorRegistry.shutdownAll
}
it("should be able to sum streams recursively") {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val result = new AtomicLong(0)
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
val x = stream()
if(result.addAndGet(x) == 166666500)
latch.countDown
recurseSum(stream)
}
thread { ints(0, 1000, producer) }
thread { sum(0, producer, consumer) }
thread { recurseSum(consumer) }
latch.await(15,TimeUnit.SECONDS) should equal (true)
ActorRegistry.shutdownAll
}*/
/* Test not ready for prime time, causes some sort of deadlock */
/* it("should be able to conditionally set variables") {
import DataFlow._
val latch = new CountDownLatch(1)
val x, y, z, v = new DataFlowVariable[Int]
val main = thread {
x << 1
z << Math.max(x(),y())
latch.countDown
}
val setY = thread {
Thread sleep 2000
y << 2
}
val setV = thread {
v << y
}
latch.await(2,TimeUnit.SECONDS) should equal (true)
List(x,y,z,v) foreach (_.shutdown)
List(main,setY,setV) foreach (_ ! Exit)
println("Foo")
ActorRegistry.shutdownAll
}*/
}

View file

@ -1,79 +0,0 @@
(
;; Where you unpacked the ENSIME distribution.
:server-root "/Users/jboner/config/emacs-config/lib/ensime"
;; The command with which to invoke the ENSIME server. Change this to
;; "bin/server.bat" if your're on Windows.
:server-cmd "bin/server.sh"
;; The host to connect to. Connecting to remote ENSIME servers is not
;; currently supported.
;; ------------------------------
;; :server-host "localhost"
;; Assume a standard sbt directory structure. Look in default sbt
;; locations for dependencies, sources, target, etc.
;;
;; Note for sbt subprojects: Each subproject needs it's own .ensime
;; file.
;; -----------------------------
:use-sbt t
:sbt-compile-conf "compile"
;; Use an existing pom.xml to determine the dependencies
;; for the project. A Maven-style directory structure is assumed.
;; -----------------------------
;; :use-maven t
;; :maven-compile-scopes "compile"
;; :maven-runtime-scopes "runtime"
;; Use an existing ivy.xml to determine the dependencies
;; for the project. A Maven-style directory structure is assumed.
;; -----------------------------
;; :use-ivy t
;; :ivy-compile-conf "compile"
;; :ivy-runtime-conf "compile"
;; The home package for your project.
;; Used by ENSIME to populate the project outline view.
;; ------------------------------
:project-package "se.scalablesolutions.akka"
;; :sources ([dir | file]*)
;; Include source files by directory(recursively) or by filename.
;; ------------------------------
:sources ("src/main/")
;; :dependency-jars ([dir | file]*)
;; Include jars by directory(recursively) or by filename.
;; ------------------------------
;; :dependency-jars ("lib")
;; :dependency-dirs ([dir | file]*)
;; Include directories of .class files.
;; ------------------------------
;; :dependency-dirs ("target/classes")
;; :target dir
;; Specify the target of the project build process. Should be
;; the directory where .class files are written
;;
;; The target is used to populate the classpath when launching
;; the inferior scala repl.
;; ------------------------------
;; :target "target/classes"
)

View file

@ -1,44 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka
import se.scalablesolutions.akka.serialization.Serializable
import sbinary._
import sbinary.Operations._
sealed abstract class TestMessage
case object Ping extends TestMessage
case object Pong extends TestMessage
case object OneWay extends TestMessage
case object Die extends TestMessage
case object NotifySupervisorExit extends TestMessage
case class User(val usernamePassword: Tuple2[String, String],
val email: String,
val age: Int)
extends Serializable.SBinary[User] {
def this() = this(null, null, 0)
import sbinary.DefaultProtocol._
implicit object UserFormat extends Format[User] {
def reads(in : Input) = User(
read[Tuple2[String, String]](in),
read[String](in),
read[Int](in))
def writes(out: Output, value: User) = {
write[Tuple2[String, String]](out, value.usernamePassword)
write[String](out, value.email)
write[Int](out, value.age)
}
}
def fromBytes(bytes: Array[Byte]) = fromByteArray[User](bytes)
def toBytes: Array[Byte] = toByteArray(this)
}
case object RemotePing extends TestMessage
case object RemotePong extends TestMessage
case object RemoteOneWay extends TestMessage
case object RemoteDie extends TestMessage
case object RemoteNotifySupervisorExit extends TestMessage

Some files were not shown because too many files have changed in this diff Show more