Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
96367d9575
20 changed files with 316 additions and 155 deletions
|
|
@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach }
|
|||
import akka.actor.TypedActor._
|
||||
import akka.japi.{ Option ⇒ JOption }
|
||||
import akka.util.Duration
|
||||
import akka.dispatch.{ Dispatchers, Future, AlreadyCompletedFuture }
|
||||
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
|
||||
import akka.routing.CyclicIterator
|
||||
|
||||
object TypedActorSpec {
|
||||
|
|
@ -43,7 +43,7 @@ object TypedActorSpec {
|
|||
|
||||
def pigdog = "Pigdog"
|
||||
|
||||
def futurePigdog(): Future[String] = new AlreadyCompletedFuture(Right(pigdog))
|
||||
def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog))
|
||||
def futurePigdog(delay: Long): Future[String] = {
|
||||
Thread.sleep(delay)
|
||||
futurePigdog
|
||||
|
|
@ -51,7 +51,7 @@ object TypedActorSpec {
|
|||
|
||||
def futurePigdog(delay: Long, numbered: Int): Future[String] = {
|
||||
Thread.sleep(delay)
|
||||
new AlreadyCompletedFuture(Right(pigdog + numbered))
|
||||
new KeptPromise(Right(pigdog + numbered))
|
||||
}
|
||||
|
||||
def futureComposePigdogFrom(foo: Foo): Future[String] =
|
||||
|
|
@ -132,6 +132,12 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
|
|||
stop(null) must be(false)
|
||||
}
|
||||
|
||||
"throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in {
|
||||
(intercept[IllegalStateException] {
|
||||
TypedActor.self[Foo]
|
||||
}).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!")
|
||||
}
|
||||
|
||||
"have access to itself when executing a method call" in {
|
||||
val t = newFooBar
|
||||
t.self must be(t)
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
|
|||
|
||||
//CANDIDATE FOR TESTKIT
|
||||
def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = {
|
||||
val result = new DefaultCompletableFuture[T](within.length, within.unit)
|
||||
val result = new DefaultPromise[T](within.length, within.unit)
|
||||
val t = new Thread(new Runnable {
|
||||
def run = try {
|
||||
result.completeWithResult(fun)
|
||||
|
|
|
|||
|
|
@ -3,15 +3,14 @@ package akka.actor
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import Actor._
|
||||
import java.util.concurrent.{ CyclicBarrier, TimeUnit, CountDownLatch }
|
||||
import org.scalatest.Assertions._
|
||||
import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch }
|
||||
import akka.dispatch.Future
|
||||
|
||||
object ActorRegistrySpec {
|
||||
var record = ""
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case "ping" ⇒
|
||||
record = "pong" + record
|
||||
self.reply("got ping")
|
||||
}
|
||||
}
|
||||
|
|
@ -19,10 +18,8 @@ object ActorRegistrySpec {
|
|||
class TestActor2 extends Actor {
|
||||
def receive = {
|
||||
case "ping" ⇒
|
||||
record = "pong" + record
|
||||
self.reply("got ping")
|
||||
case "ping2" ⇒
|
||||
record = "pong" + record
|
||||
self.reply("got ping")
|
||||
}
|
||||
}
|
||||
|
|
@ -41,6 +38,7 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
assert(actor2.get.address === actor1.address)
|
||||
assert(actor2.get.address === "test-actor-1")
|
||||
actor2.get.stop
|
||||
assert(Actor.registry.actorFor(actor1.address).isEmpty)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -54,6 +52,7 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
assert(actorOrNone.get.uuid === uuid)
|
||||
assert(actorOrNone.get.address === "test-actor-1")
|
||||
actor.stop
|
||||
assert(Actor.registry.local.actorFor(uuid).isEmpty)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -71,10 +70,8 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
@Test
|
||||
def shouldGetAllActorsFromLocalActorRegistry {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
val actor1 = actorOf[TestActor]("test-actor-1").start
|
||||
val actor2 = actorOf[TestActor]("test-actor-2").start
|
||||
val actors = Actor.registry.local.actors
|
||||
assert(actors.size === 2)
|
||||
assert(actors.head.actor.isInstanceOf[TestActor])
|
||||
|
|
@ -88,13 +85,15 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
@Test
|
||||
def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach {
|
||||
Actor.registry.local.shutdownAll
|
||||
val actor1 = actorOf[TestActor]("test-actor-1")
|
||||
actor1.start
|
||||
val actor2 = actorOf[TestActor]("test-actor-2")
|
||||
actor2.start
|
||||
record = ""
|
||||
Actor.registry.local.foreach(actor ⇒ actor !! "ping")
|
||||
assert(record === "pongpong")
|
||||
val actor1 = actorOf[TestActor]("test-actor-1").start
|
||||
val actor2 = actorOf[TestActor]("test-actor-2").start
|
||||
val results = new ConcurrentLinkedQueue[Future[String]]
|
||||
|
||||
Actor.registry.local.foreach(actor ⇒ results.add(actor.!!))
|
||||
|
||||
assert(results.size === 2)
|
||||
val i = results.iterator
|
||||
while (i.hasNext) assert(i.next.get === "got ping")
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,11 +7,25 @@ import akka.testing._
|
|||
import akka.testing.Testing.{ sleepFor, testMillis }
|
||||
import akka.util.duration._
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Actor._
|
||||
import akka.routing._
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.dispatch.{ KeptPromise, Future }
|
||||
import akka.actor.{ TypedActor, Actor }
|
||||
|
||||
object RoutingSpec {
|
||||
trait Foo {
|
||||
def sq(x: Int, sleep: Long): Future[Int]
|
||||
}
|
||||
|
||||
class FooImpl extends Foo {
|
||||
def sq(x: Int, sleep: Long): Future[Int] = {
|
||||
if (sleep > 0) Thread.sleep(sleep)
|
||||
new KeptPromise(Right(x * x))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RoutingSpec extends WordSpec with MustMatchers {
|
||||
import Routing._
|
||||
|
|
@ -491,6 +505,29 @@ class RoutingSpec extends WordSpec with MustMatchers {
|
|||
|
||||
pool.stop()
|
||||
}
|
||||
|
||||
"support typed actors" in {
|
||||
import RoutingSpec._
|
||||
import TypedActor._
|
||||
def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
|
||||
def lowerBound = 1
|
||||
def upperBound = 5
|
||||
def pressureThreshold = 1
|
||||
def partialFill = true
|
||||
def selectionCount = 1
|
||||
def rampupRate = 0.1
|
||||
def backoffRate = 0.50
|
||||
def backoffThreshold = 0.50
|
||||
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
|
||||
def receive = _route
|
||||
}
|
||||
|
||||
val pool = createProxy[Foo](createPool)
|
||||
|
||||
val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100))
|
||||
|
||||
for ((i, r) ← results) r.get must equal(i * i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -208,7 +208,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
* The reference sender future of the last received message.
|
||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture
|
||||
def getSenderFuture: Option[Promise[Any]] = senderFuture
|
||||
|
||||
/**
|
||||
* Is the actor being restarted?
|
||||
|
|
@ -482,7 +482,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
|
||||
senderFuture: Option[Promise[T]]): Promise[T]
|
||||
|
||||
protected[akka] def actorInstance: AtomicReference[Actor]
|
||||
|
||||
|
|
@ -698,10 +698,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
|
||||
senderFuture: Option[Promise[T]]): Promise[T] = {
|
||||
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout))
|
||||
dispatcher dispatchMessage new MessageInvocation(
|
||||
this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
|
||||
this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]])
|
||||
future.get
|
||||
}
|
||||
|
||||
|
|
@ -1022,7 +1022,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||
senderFuture: Option[Promise[T]]): Promise[T] = {
|
||||
val future = Actor.remote.send[T](
|
||||
message, senderOption, senderFuture,
|
||||
remoteAddress, timeout, false, this, loader)
|
||||
|
|
@ -1157,7 +1157,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
|
|||
* The reference sender future of the last received message.
|
||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def senderFuture(): Option[CompletableFuture[Any]] = {
|
||||
def senderFuture(): Option[Promise[Any]] = {
|
||||
val msg = currentMessage
|
||||
if (msg eq null) None
|
||||
else msg.senderFuture
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
|
||||
//private val isClusterEnabled = ReflectiveAccess.isClusterEnabled
|
||||
private val actorsByAddress = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef]
|
||||
private val guard = new ReadWriteGuard
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
// throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]")
|
||||
|
||||
actorsByAddress.put(address, actor)
|
||||
actorsByUuid.put(actor.uuid.toString, actor)
|
||||
actorsByUuid.put(actor.uuid, actor)
|
||||
notifyListeners(ActorRegistered(address, actor))
|
||||
}
|
||||
|
||||
|
|
@ -121,7 +121,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
|
|||
*/
|
||||
class LocalActorRegistry(
|
||||
private val actorsByAddress: ConcurrentHashMap[String, ActorRef],
|
||||
private val actorsByUuid: ConcurrentHashMap[String, ActorRef],
|
||||
private val actorsByUuid: ConcurrentHashMap[Uuid, ActorRef],
|
||||
private val typedActorsByUuid: ConcurrentHashMap[Uuid, AnyRef]) {
|
||||
|
||||
/**
|
||||
|
|
@ -153,11 +153,8 @@ class LocalActorRegistry(
|
|||
/**
|
||||
* Finds the actor that have a specific uuid.
|
||||
*/
|
||||
private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = {
|
||||
val uuidAsString = uuid.toString
|
||||
if (actorsByUuid.containsKey(uuidAsString)) Some(actorsByUuid.get(uuidAsString))
|
||||
else None
|
||||
}
|
||||
private[akka] def actorFor(uuid: Uuid): Option[ActorRef] =
|
||||
Option(actorsByUuid.get(uuid))
|
||||
|
||||
/**
|
||||
* Finds the typed actor that have a specific address.
|
||||
|
|
|
|||
|
|
@ -13,22 +13,23 @@ import java.util.concurrent.atomic.AtomicReference
|
|||
|
||||
object TypedActor {
|
||||
private val selfReference = new ThreadLocal[AnyRef]
|
||||
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T]
|
||||
|
||||
class TypedActor[TI <: AnyRef](proxyRef: AtomicReference[AnyRef], createInstance: ⇒ TI) extends Actor {
|
||||
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match {
|
||||
case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!")
|
||||
case some ⇒ some
|
||||
}
|
||||
|
||||
class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor {
|
||||
val me = createInstance
|
||||
def callMethod(methodCall: MethodCall): Unit = methodCall match {
|
||||
case m if m.isOneWay ⇒ m(me)
|
||||
case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
|
||||
case m ⇒ self reply m(me)
|
||||
}
|
||||
def receive = {
|
||||
case m: MethodCall ⇒
|
||||
selfReference set proxyRef.get
|
||||
try {
|
||||
m match {
|
||||
case m if m.isOneWay ⇒ m(me)
|
||||
case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
|
||||
case m ⇒ self reply m(me)
|
||||
}
|
||||
} finally {
|
||||
selfReference set null
|
||||
}
|
||||
try { callMethod(m) } finally { selfReference set null }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -42,25 +43,25 @@ object TypedActor {
|
|||
case m if m.isOneWay ⇒
|
||||
actor ! m
|
||||
null
|
||||
case m if m.returnsJOption_? ⇒
|
||||
(actor !!! m).as[JOption[Any]] match {
|
||||
case Some(null) | None ⇒ JOption.none[Any]
|
||||
case Some(joption) ⇒ joption
|
||||
}
|
||||
case m if m.returnsOption_? ⇒
|
||||
(actor !!! m).as[AnyRef] match {
|
||||
case Some(null) | None ⇒ None
|
||||
case Some(option) ⇒ option
|
||||
}
|
||||
case m if m.returnsFuture_? ⇒
|
||||
actor !!! m
|
||||
case m if m.returnsJOption_? || m.returnsOption_? ⇒
|
||||
(actor !!! m).as[AnyRef] match {
|
||||
case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None
|
||||
case Some(joption) ⇒ joption
|
||||
}
|
||||
case m ⇒
|
||||
(actor !!! m).get
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Configuration(timeout: Duration = Duration(Actor.TIMEOUT, "millis"), dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
|
||||
object Configuration {
|
||||
val defaultTimeout = Duration(Actor.TIMEOUT, "millis")
|
||||
val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher)
|
||||
def apply(): Configuration = defaultConfiguration
|
||||
}
|
||||
case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
|
||||
|
||||
case class MethodCall(method: Method, parameters: Array[AnyRef]) {
|
||||
def isOneWay = method.getReturnType == java.lang.Void.TYPE
|
||||
|
|
@ -83,39 +84,24 @@ object TypedActor {
|
|||
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
|
||||
}
|
||||
|
||||
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration): T =
|
||||
newTypedActor(Array[Class[_]](interface), impl.newInstance, config, interface.getClassLoader)
|
||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R =
|
||||
createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader)
|
||||
|
||||
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration): T =
|
||||
newTypedActor(Array[Class[_]](interface), impl.create, config, interface.getClassLoader)
|
||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R =
|
||||
createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader)
|
||||
|
||||
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T =
|
||||
newTypedActor(Array[Class[_]](interface), impl.newInstance, config, loader)
|
||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R =
|
||||
createProxyAndTypedActor(interface, impl.newInstance, config, loader)
|
||||
|
||||
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T =
|
||||
newTypedActor(Array[Class[_]](interface), impl.create, config, loader)
|
||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R =
|
||||
createProxyAndTypedActor(interface, impl.create, config, loader)
|
||||
|
||||
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R =
|
||||
newTypedActor(impl.getInterfaces, impl.newInstance, config, loader)
|
||||
createProxyAndTypedActor(impl, impl.newInstance, config, loader)
|
||||
|
||||
def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
|
||||
val clazz = m.erasure.asInstanceOf[Class[T]]
|
||||
newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
|
||||
}
|
||||
|
||||
protected def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = {
|
||||
val proxyRef = new AtomicReference[AnyRef](null)
|
||||
configureAndProxyLocalActorRef[T](interfaces, proxyRef, actorOf(new TypedActor[T](proxyRef, constructor)), config, loader)
|
||||
}
|
||||
|
||||
protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[AnyRef], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
|
||||
actor.timeout = config.timeout.toMillis
|
||||
actor.dispatcher = config.dispatcher
|
||||
|
||||
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T]
|
||||
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
||||
Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
|
||||
proxy
|
||||
createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
|
||||
}
|
||||
|
||||
def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match {
|
||||
|
|
@ -133,4 +119,32 @@ object TypedActor {
|
|||
}
|
||||
|
||||
def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null
|
||||
|
||||
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R =
|
||||
createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader)
|
||||
|
||||
def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
|
||||
createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
|
||||
|
||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R =
|
||||
createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader)
|
||||
|
||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = {
|
||||
val proxyRef = new AtomicReference[R]
|
||||
configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader)
|
||||
}
|
||||
|
||||
protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
|
||||
actor.timeout = config.timeout.toMillis
|
||||
actor.dispatcher = config.dispatcher
|
||||
|
||||
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T]
|
||||
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
||||
Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
|
||||
proxy
|
||||
}
|
||||
|
||||
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] =
|
||||
if (clazz.isInterface) Array[Class[_]](clazz)
|
||||
else clazz.getInterfaces
|
||||
}
|
||||
|
|
@ -20,8 +20,6 @@ import java.lang.{ Iterable ⇒ JIterable }
|
|||
import java.util.{ LinkedList ⇒ JLinkedList }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
import scala.collection.mutable.Builder
|
||||
import scala.collection.mutable.Stack
|
||||
|
||||
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
|
@ -56,7 +54,7 @@ object Futures {
|
|||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
|
||||
val futureResult = new DefaultCompletableFuture[T](timeout)
|
||||
val futureResult = new DefaultPromise[T](timeout)
|
||||
|
||||
val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
|
||||
for (f ← futures) f onComplete completeFirst
|
||||
|
|
@ -83,9 +81,9 @@ object Futures {
|
|||
*/
|
||||
def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = {
|
||||
if (futures.isEmpty) {
|
||||
new AlreadyCompletedFuture[R](Right(zero))
|
||||
new KeptPromise[R](Right(zero))
|
||||
} else {
|
||||
val result = new DefaultCompletableFuture[R](timeout)
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val allDone = futures.size
|
||||
|
||||
|
|
@ -135,9 +133,9 @@ object Futures {
|
|||
*/
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
|
||||
else {
|
||||
val result = new DefaultCompletableFuture[R](timeout)
|
||||
val result = new DefaultPromise[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
|
|
@ -202,7 +200,7 @@ object Futures {
|
|||
* in parallel.
|
||||
*
|
||||
* def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
||||
* in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
||||
* in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
||||
* val fb = fn(a.asInstanceOf[A])
|
||||
* for (r <- fr; b <-fb) yield (r += b)
|
||||
* }.map(_.result)
|
||||
|
|
@ -230,7 +228,7 @@ object Future {
|
|||
/**
|
||||
* Create an empty Future with default timeout
|
||||
*/
|
||||
def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout)
|
||||
def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout)
|
||||
|
||||
import scala.collection.mutable.Builder
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
|
|
@ -240,7 +238,7 @@ object Future {
|
|||
* Useful for reducing many Futures into a single Future.
|
||||
*/
|
||||
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
||||
in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
||||
in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
||||
|
||||
/**
|
||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
|
||||
|
|
@ -251,7 +249,7 @@ object Future {
|
|||
* </pre>
|
||||
*/
|
||||
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
||||
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒
|
||||
in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒
|
||||
val fb = fn(a.asInstanceOf[A])
|
||||
for (r ← fr; b ← fb) yield (r += b)
|
||||
}.map(_.result)
|
||||
|
|
@ -267,23 +265,19 @@ object Future {
|
|||
*
|
||||
* This allows working with Futures in an imperative style without blocking for each result.
|
||||
*
|
||||
* Completing a Future using 'CompletableFuture << Future' will also suspend execution until the
|
||||
* Completing a Future using 'Promise << Future' will also suspend execution until the
|
||||
* value of the other Future is available.
|
||||
*
|
||||
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
|
||||
*/
|
||||
def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
|
||||
val future = Promise[A](timeout)
|
||||
(reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒
|
||||
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒
|
||||
val opte = f.exception
|
||||
if (opte.isDefined) future completeWithException (opte.get)
|
||||
}
|
||||
future
|
||||
}
|
||||
|
||||
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
|
||||
override def initialValue = None
|
||||
}
|
||||
}
|
||||
|
||||
sealed trait Future[+T] {
|
||||
|
|
@ -417,7 +411,7 @@ sealed trait Future[+T] {
|
|||
* </pre>
|
||||
*/
|
||||
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
|
||||
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
||||
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val v = ft.value.get
|
||||
fa complete {
|
||||
|
|
@ -450,7 +444,7 @@ sealed trait Future[+T] {
|
|||
* </pre>
|
||||
*/
|
||||
final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
|
||||
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
||||
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val opte = ft.exception
|
||||
fa complete {
|
||||
|
|
@ -482,7 +476,7 @@ sealed trait Future[+T] {
|
|||
* </pre>
|
||||
*/
|
||||
final def map[A](f: T ⇒ A): Future[A] = {
|
||||
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
||||
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val optv = ft.value
|
||||
if (optv.isDefined) {
|
||||
|
|
@ -518,7 +512,7 @@ sealed trait Future[+T] {
|
|||
* </pre>
|
||||
*/
|
||||
final def flatMap[A](f: T ⇒ Future[A]): Future[A] = {
|
||||
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
||||
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val optv = ft.value
|
||||
if (optv.isDefined) {
|
||||
|
|
@ -546,7 +540,7 @@ sealed trait Future[+T] {
|
|||
}
|
||||
|
||||
final def filter(p: Any ⇒ Boolean): Future[Any] = {
|
||||
val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS)
|
||||
val f = new DefaultPromise[T](timeoutInNanos, NANOS)
|
||||
onComplete { ft ⇒
|
||||
val optv = ft.value
|
||||
if (optv.isDefined) {
|
||||
|
|
@ -596,16 +590,19 @@ sealed trait Future[+T] {
|
|||
|
||||
object Promise {
|
||||
|
||||
def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout)
|
||||
def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout)
|
||||
|
||||
def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT)
|
||||
def apply[A](): Promise[A] = apply(Actor.TIMEOUT)
|
||||
|
||||
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
|
||||
override def initialValue = None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Essentially this is the Promise (or write-side) of a Future (read-side).
|
||||
*/
|
||||
trait CompletableFuture[T] extends Future[T] {
|
||||
trait Promise[T] extends Future[T] {
|
||||
/**
|
||||
* Completes this Future with the specified result, if not already completed.
|
||||
* @return this
|
||||
|
|
@ -637,7 +634,7 @@ trait CompletableFuture[T] extends Future[T] {
|
|||
final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) }
|
||||
|
||||
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
|
||||
val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT)
|
||||
val fr = new DefaultPromise[Any](Actor.TIMEOUT)
|
||||
this completeWith other onComplete { f ⇒
|
||||
try {
|
||||
fr completeWith cont(f)
|
||||
|
|
@ -655,7 +652,7 @@ trait CompletableFuture[T] extends Future[T] {
|
|||
/**
|
||||
* The default concrete Future implementation.
|
||||
*/
|
||||
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {
|
||||
class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
|
||||
|
||||
def this() = this(0, MILLIS)
|
||||
|
||||
|
|
@ -722,7 +719,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
}
|
||||
}
|
||||
|
||||
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
|
||||
def complete(value: Either[Throwable, T]): DefaultPromise[T] = {
|
||||
_lock.lock
|
||||
val notifyTheseListeners = try {
|
||||
if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
|
||||
|
|
@ -746,7 +743,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
}
|
||||
}
|
||||
|
||||
val pending = Future.callbacksPendingExecution.get
|
||||
val pending = Promise.callbacksPendingExecution.get
|
||||
if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow)
|
||||
pending.get.push(() ⇒ { // Linearize/aggregate callbacks at top level and then execute
|
||||
val doNotify = notifyCompleted _ //Hoist closure to avoid garbage
|
||||
|
|
@ -755,16 +752,16 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
} else {
|
||||
try {
|
||||
val callbacks = Stack[() ⇒ Unit]() // Allocate new aggregator for pending callbacks
|
||||
Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator
|
||||
Promise.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator
|
||||
runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated
|
||||
} finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup
|
||||
} finally { Promise.callbacksPendingExecution.set(None) } // Ensure cleanup
|
||||
}
|
||||
}
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
def onComplete(func: Future[T] ⇒ Unit): CompletableFuture[T] = {
|
||||
def onComplete(func: Future[T] ⇒ Unit): Promise[T] = {
|
||||
_lock.lock
|
||||
val notifyNow = try {
|
||||
if (_value.isEmpty) {
|
||||
|
|
@ -800,10 +797,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
|
||||
* a Future-composition but you already have a value to contribute.
|
||||
*/
|
||||
sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] {
|
||||
sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
|
||||
val value = Some(suppliedValue)
|
||||
|
||||
def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
|
||||
def complete(value: Either[Throwable, T]): Promise[T] = this
|
||||
def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this }
|
||||
def await(atMost: Duration): Future[T] = this
|
||||
def await: Future[T] = this
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import akka.actor._
|
|||
final case class MessageInvocation(receiver: ActorRef,
|
||||
message: Any,
|
||||
sender: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[Any]]) {
|
||||
senderFuture: Option[Promise[Any]]) {
|
||||
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
|
||||
|
||||
def invoke() {
|
||||
|
|
@ -32,7 +32,7 @@ final case class MessageInvocation(receiver: ActorRef,
|
|||
}
|
||||
}
|
||||
|
||||
final case class FutureInvocation[T](future: CompletableFuture[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable {
|
||||
final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable {
|
||||
def run() {
|
||||
future complete (try {
|
||||
Right(function())
|
||||
|
|
@ -99,7 +99,7 @@ trait MessageDispatcher {
|
|||
private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = {
|
||||
futures.getAndIncrement()
|
||||
try {
|
||||
val future = new DefaultCompletableFuture[T](timeout)
|
||||
val future = new DefaultPromise[T](timeout)
|
||||
|
||||
if (active.isOff)
|
||||
guard withGuard {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.remoteinterface
|
|||
import akka.japi.Creator
|
||||
import akka.actor._
|
||||
import akka.util._
|
||||
import akka.dispatch.CompletableFuture
|
||||
import akka.dispatch.Promise
|
||||
import akka.serialization._
|
||||
import akka.AkkaException
|
||||
|
||||
|
|
@ -300,10 +300,10 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒
|
|||
|
||||
protected[akka] def send[T](message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
timeout: Long,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
loader: Option[ClassLoader]): Option[CompletableFuture[T]]
|
||||
loader: Option[ClassLoader]): Option[Promise[T]]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.util
|
||||
|
||||
import akka.dispatch.{ Future, CompletableFuture, MessageInvocation }
|
||||
import akka.dispatch.{ Future, Promise, MessageInvocation }
|
||||
import akka.config.{ Config, ModuleNotAvailableException }
|
||||
import akka.remoteinterface.RemoteSupport
|
||||
import akka.actor._
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import Cluster._
|
|||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.CompletableFuture
|
||||
import akka.dispatch.Promise
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
|
@ -42,8 +42,8 @@ class ClusterActorRef private[akka] (
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] =
|
||||
route[T](message, timeout)(senderOption).asInstanceOf[CompletableFuture[T]]
|
||||
senderFuture: Option[Promise[T]]): Promise[T] =
|
||||
route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]]
|
||||
|
||||
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) {
|
||||
addresses set (addresses.get map {
|
||||
|
|
|
|||
111
akka-cluster/src/main/scala/akka/cluster/RawStorage.scala
Normal file
111
akka-cluster/src/main/scala/akka/cluster/RawStorage.scala
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
package akka.cluster
|
||||
|
||||
import zookeeper.AkkaZkClient
|
||||
import java.lang.UnsupportedOperationException
|
||||
import akka.AkkaException
|
||||
import org.apache.zookeeper.{ KeeperException, CreateMode }
|
||||
import org.apache.zookeeper.data.Stat
|
||||
import scala.Some
|
||||
|
||||
/**
|
||||
* Simple abstraction to store an Array of bytes based on some String key.
|
||||
*
|
||||
* Nothing is being said about ACID, transactions etc. It depends on the implementation
|
||||
* of this Storage interface of what is and isn't done on the lowest level.
|
||||
*
|
||||
* TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking.
|
||||
* (This is supported by ZooKeeper).
|
||||
* TODO: Class is up for better names.
|
||||
* TODO: Instead of a String as key, perhaps also a byte-array.
|
||||
*/
|
||||
trait RawStorage {
|
||||
|
||||
/**
|
||||
* Inserts a byte-array based on some key.
|
||||
*
|
||||
* TODO: What happens when given key already exists
|
||||
* @throws NodeExistsException
|
||||
*/
|
||||
def insert(key: String, bytes: Array[Byte]): Unit
|
||||
|
||||
/**
|
||||
* Stores a array of bytes based on some key.
|
||||
*
|
||||
* TODO: What happens when the given key doesn't exist yet
|
||||
*/
|
||||
def update(key: String, bytes: Array[Byte]): Unit
|
||||
|
||||
/**
|
||||
* Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None.
|
||||
*/
|
||||
def load(key: String): Option[Array[Byte]]
|
||||
}
|
||||
|
||||
/**
|
||||
* An AkkaException thrown by the RawStorage module.
|
||||
*/
|
||||
class RawStorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause)
|
||||
|
||||
/**
|
||||
* *
|
||||
* A RawStorageException thrown when an operation is done on a non existing node.
|
||||
*/
|
||||
class MissingNodeException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause)
|
||||
|
||||
/**
|
||||
* A RawStorageException thrown when an operation is done on an existing node, but no node was expected.
|
||||
*/
|
||||
class NodeExistsException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause)
|
||||
|
||||
/**
|
||||
* A RawStorage implementation based on ZooKeeper.
|
||||
*
|
||||
* The store method is atomic:
|
||||
* - so everything is written or nothing is written
|
||||
* - is isolated, so threadsafe,
|
||||
* but it will not participate in any transactions.
|
||||
* //todo: unclear, is only a single connection used in the JVM??
|
||||
*
|
||||
*/
|
||||
class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage {
|
||||
|
||||
override def insert(key: String, bytes: Array[Byte]) {
|
||||
try {
|
||||
zkClient.connection.create(key, bytes, CreateMode.PERSISTENT);
|
||||
} catch {
|
||||
case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException("failed to insert key" + key, e)
|
||||
case e: KeeperException ⇒ throw new RawStorageException("failed to insert key" + key, e)
|
||||
}
|
||||
}
|
||||
|
||||
override def load(key: String) = try {
|
||||
Some(zkClient.connection.readData(key, new Stat, false))
|
||||
} catch {
|
||||
case e: KeeperException.NoNodeException ⇒ None
|
||||
case e: KeeperException ⇒ throw new RawStorageException("failed to load key" + key, e)
|
||||
}
|
||||
|
||||
override def update(key: String, bytes: Array[Byte]) {
|
||||
try {
|
||||
zkClient.connection.writeData(key, bytes)
|
||||
} catch {
|
||||
case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException("failed to update key", e)
|
||||
case e: KeeperException ⇒ throw new RawStorageException("failed to update key", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class VoldemortRawStorage extends RawStorage {
|
||||
|
||||
def load(Key: String) = {
|
||||
throw new UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override def insert(key: String, bytes: Array[Byte]) {
|
||||
throw new UnsupportedOperationException()
|
||||
}
|
||||
|
||||
def update(key: String, bytes: Array[Byte]) {
|
||||
throw new UnsupportedOperationException()
|
||||
}
|
||||
}
|
||||
|
|
@ -86,7 +86,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
|
|||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
|
||||
senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
|
||||
protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance
|
||||
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
|
||||
actorRef.supervisor_=(sup)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import akka.config._
|
|||
import Config._
|
||||
import akka.util._
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture }
|
||||
import akka.dispatch.{ DefaultPromise, Promise }
|
||||
import akka.AkkaException
|
||||
|
||||
import akka.cluster.zookeeper._
|
||||
|
|
@ -140,7 +140,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync:
|
|||
"Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
|
||||
|
||||
if (isAsync) {
|
||||
val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout)
|
||||
val future = new DefaultPromise[Vector[Array[Byte]]](timeout)
|
||||
ledger.asyncReadEntries(
|
||||
from, to,
|
||||
new AsyncCallback.ReadCallback {
|
||||
|
|
@ -149,7 +149,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync:
|
|||
ledgerHandle: LedgerHandle,
|
||||
enumeration: Enumeration[LedgerEntry],
|
||||
ctx: AnyRef) {
|
||||
val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]]
|
||||
val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]]
|
||||
var entries = Vector[Array[Byte]]()
|
||||
while (enumeration.hasMoreElements) {
|
||||
entries = entries :+ enumeration.nextElement.getEntry
|
||||
|
|
@ -362,7 +362,7 @@ object TransactionLog {
|
|||
if (zkClient.exists(txLogPath)) throw new ReplicationException(
|
||||
"Transaction log for UUID [" + id + "] already exists")
|
||||
|
||||
val future = new DefaultCompletableFuture[LedgerHandle](timeout)
|
||||
val future = new DefaultPromise[LedgerHandle](timeout)
|
||||
if (isAsync) {
|
||||
bookieClient.asyncCreateLedger(
|
||||
ensembleSize, quorumSize, digestType, password,
|
||||
|
|
@ -371,7 +371,7 @@ object TransactionLog {
|
|||
returnCode: Int,
|
||||
ledgerHandle: LedgerHandle,
|
||||
ctx: AnyRef) {
|
||||
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
|
||||
val future = ctx.asInstanceOf[Promise[LedgerHandle]]
|
||||
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
|
||||
else future.completeWithException(BKException.create(returnCode))
|
||||
}
|
||||
|
|
@ -422,7 +422,7 @@ object TransactionLog {
|
|||
|
||||
val ledger = try {
|
||||
if (isAsync) {
|
||||
val future = new DefaultCompletableFuture[LedgerHandle](timeout)
|
||||
val future = new DefaultPromise[LedgerHandle](timeout)
|
||||
bookieClient.asyncOpenLedger(
|
||||
logId, digestType, password,
|
||||
new AsyncCallback.OpenCallback {
|
||||
|
|
@ -430,7 +430,7 @@ object TransactionLog {
|
|||
returnCode: Int,
|
||||
ledgerHandle: LedgerHandle,
|
||||
ctx: AnyRef) {
|
||||
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
|
||||
val future = ctx.asInstanceOf[Promise[LedgerHandle]]
|
||||
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
|
||||
else future.completeWithException(BKException.create(returnCode))
|
||||
}
|
||||
|
|
@ -447,7 +447,7 @@ object TransactionLog {
|
|||
TransactionLog(ledger, id, isAsync)
|
||||
}
|
||||
|
||||
private[akka] def await[T](future: CompletableFuture[T]): T = {
|
||||
private[akka] def await[T](future: Promise[T]): T = {
|
||||
future.await
|
||||
if (future.result.isDefined) future.result.get
|
||||
else if (future.exception.isDefined) handleError(future.exception.get)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ Dataflow Concurrency (Java)
|
|||
Introduction
|
||||
------------
|
||||
|
||||
**IMPORTANT: As of Akka 1.1, Akka Future, CompletableFuture and DefaultCompletableFuture have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.**
|
||||
**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.**
|
||||
|
||||
Akka implements `Oz-style dataflow concurrency <http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency>`_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads.
|
||||
|
||||
|
|
|
|||
|
|
@ -308,9 +308,9 @@ Reply using the sender future
|
|||
|
||||
If a message was sent with the 'sendRequestReply' or 'sendRequestReplyFuture' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
|
||||
|
||||
The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option<CompletableFuture> getSenderFuture()'.
|
||||
The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option<Promise> getSenderFuture()'.
|
||||
|
||||
CompletableFuture is a future with methods for 'completing the future:
|
||||
Promise is a future with methods for 'completing the future:
|
||||
* completeWithResult(..)
|
||||
* completeWithException(..)
|
||||
|
||||
|
|
|
|||
|
|
@ -331,7 +331,7 @@ Reply using the sender future
|
|||
|
||||
If a message was sent with the ``!!`` or ``!!!`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
|
||||
|
||||
The reference to the Future resides in the ``senderFuture: Option[CompletableFuture[_]]`` member field in the ``ActorRef`` class.
|
||||
The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class.
|
||||
|
||||
Here is an example of how it can be used:
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.remote.netty
|
||||
|
||||
import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future }
|
||||
import akka.dispatch.{ DefaultPromise, Promise, Future }
|
||||
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
|
||||
import akka.remote.protocol.RemoteProtocol._
|
||||
import akka.serialization.RemoteActorSerialization
|
||||
|
|
@ -73,12 +73,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
|
||||
protected[akka] def send[T](message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
timeout: Long,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
|
||||
loader: Option[ClassLoader]): Option[Promise[T]] =
|
||||
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef))
|
||||
|
||||
private[akka] def withClientFor[T](
|
||||
|
|
@ -154,7 +154,7 @@ abstract class RemoteClient private[akka] (
|
|||
remoteAddress.getAddress.getHostAddress + "::" +
|
||||
remoteAddress.getPort
|
||||
|
||||
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
|
||||
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
|
||||
protected val pendingRequests = {
|
||||
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
|
||||
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
|
||||
|
|
@ -191,11 +191,11 @@ abstract class RemoteClient private[akka] (
|
|||
def send[T](
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
timeout: Long,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef): Option[CompletableFuture[T]] =
|
||||
actorRef: ActorRef): Option[Promise[T]] =
|
||||
send(createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build,
|
||||
senderFuture)
|
||||
|
|
@ -205,7 +205,7 @@ abstract class RemoteClient private[akka] (
|
|||
*/
|
||||
def send[T](
|
||||
request: RemoteMessageProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
try {
|
||||
|
|
@ -227,7 +227,7 @@ abstract class RemoteClient private[akka] (
|
|||
None
|
||||
} else {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
else new DefaultPromise[T](request.getActorInfo.getTimeout)
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
|
||||
|
||||
|
|
@ -410,7 +410,7 @@ class ActiveRemoteClient private[akka] (
|
|||
*/
|
||||
class ActiveRemoteClientPipelineFactory(
|
||||
name: String,
|
||||
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: InetSocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
|
|
@ -439,7 +439,7 @@ class ActiveRemoteClientPipelineFactory(
|
|||
@ChannelHandler.Sharable
|
||||
class ActiveRemoteClientHandler(
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
val futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: InetSocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
|
|
@ -457,7 +457,7 @@ class ActiveRemoteClientHandler(
|
|||
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
|
||||
val reply = arp.getMessage
|
||||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]]
|
||||
|
||||
if (reply.hasMessage) {
|
||||
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
|
||||
|
|
@ -891,7 +891,7 @@ class RemoteServerHandler(
|
|||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
|
||||
Some(new DefaultPromise[Any](request.getActorInfo.getTimeout).
|
||||
onComplete(_.value.get match {
|
||||
case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request))
|
||||
case r: Right[Throwable, Any] ⇒
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.agent
|
|||
import akka.stm._
|
||||
import akka.actor.Actor
|
||||
import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc }
|
||||
import akka.dispatch.{ DefaultCompletableFuture, Dispatchers, Future }
|
||||
import akka.dispatch.{ DefaultPromise, Dispatchers, Future }
|
||||
|
||||
/**
|
||||
* Used internally to send functions.
|
||||
|
|
@ -122,7 +122,7 @@ class Agent[T](initialValue: T) {
|
|||
def alter(f: T ⇒ T)(timeout: Long): Future[T] = {
|
||||
def dispatch = updater.!!!(Update(f), timeout)
|
||||
if (Stm.activeTransaction) {
|
||||
val result = new DefaultCompletableFuture[T](timeout)
|
||||
val result = new DefaultPromise[T](timeout)
|
||||
get //Join xa
|
||||
deferred {
|
||||
result completeWith dispatch
|
||||
|
|
@ -164,7 +164,7 @@ class Agent[T](initialValue: T) {
|
|||
* still be executed in order.
|
||||
*/
|
||||
def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = {
|
||||
val result = new DefaultCompletableFuture[T](timeout)
|
||||
val result = new DefaultPromise[T](timeout)
|
||||
send((value: T) ⇒ {
|
||||
suspend
|
||||
val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue