Partial work + broken commit

This commit is contained in:
Viktor Klang 2012-07-04 15:25:30 +02:00
parent 3911b18069
commit 52d33113d9
50 changed files with 228 additions and 1092 deletions

View file

@ -5,6 +5,8 @@ import akka.actor.ActorSystem;
import akka.japi.*;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.util.Duration;
import akka.testkit.TestKitExtension;
import org.junit.AfterClass;
@ -54,7 +56,7 @@ public class JavaFutureTests {
public String apply(String s) {
return s + " World";
}
});
}, system.dispatcher());
assertEquals("Hello World", Await.result(f2, timeout));
}
@ -63,13 +65,13 @@ public class JavaFutureTests {
public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> f = cf.future();
f.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
});
}, system.dispatcher());
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
@ -80,13 +82,13 @@ public class JavaFutureTests {
public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> f = cf.future();
f.onFailure(new OnFailure() {
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
});
}, system.dispatcher());
Throwable exception = new NullPointerException();
cf.failure(exception);
@ -98,12 +100,12 @@ public class JavaFutureTests {
public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> f = cf.future();
f.onComplete(new OnComplete<String>() {
public void onComplete(Throwable t, String r) {
latch.countDown();
}
});
}, system.dispatcher());
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
@ -114,12 +116,12 @@ public class JavaFutureTests {
public void mustBeAbleToForeachAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> f = cf.future();
f.foreach(new Foreach<String>() {
public void each(String future) {
latch.countDown();
}
});
},system.dispatcher());
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
@ -131,16 +133,16 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
cf.success("1000");
Future<String> f = cf;
Future<String> f = cf.future();
Future<Integer> r = f.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> checkedApply(String r) throws Throwable {
if (false) throw new IOException("Just here to make sure this compiles.");
latch.countDown();
Promise<Integer> cf = Futures.promise(system.dispatcher());
cf.success(Integer.parseInt(r));
return cf;
return cf.future();
}
});
}, system.dispatcher());
assertEquals(Await.result(f, timeout), "1000");
assertEquals(Await.result(r, timeout).intValue(), 1000);
@ -151,13 +153,13 @@ public class JavaFutureTests {
public void mustBeAbleToFilterAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> f = cf.future();
Future<String> r = f.filter(Filter.filterOf(new Function<String, Boolean>() {
public Boolean apply(String r) {
latch.countDown();
return r.equals("foo");
}
}));
}), system.dispatcher());
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
@ -281,8 +283,8 @@ public class JavaFutureTests {
Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo");
Await.ready(p, d);
assertEquals(Await.result(p, d), "foo");
Await.ready(p.future(), d);
assertEquals(Await.result(p.future(), d), "foo");
}
@Test
@ -291,8 +293,8 @@ public class JavaFutureTests {
Future<String> f = p.future().mapTo(classTag(String.class));
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo");
Await.ready(p, d);
assertEquals(Await.result(p, d), "foo");
Await.ready(p.future(), d);
assertEquals(Await.result(p.future(), d), "foo");
}
@Test
@ -306,7 +308,7 @@ public class JavaFutureTests {
else
throw t;
}
});
}, system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.failure(fail);
assertEquals(Await.result(f, d), "foo");
@ -323,7 +325,7 @@ public class JavaFutureTests {
else
throw t;
}
});
}, system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.failure(fail);
assertEquals(Await.result(f, d), "foo");

View file

@ -14,7 +14,7 @@ import akka.util.Timeout
import scala.concurrent.util.duration._
import scala.concurrent.Await
import java.lang.IllegalStateException
import akka.dispatch.Promise
import scala.concurrent.Promise
import akka.pattern.ask
import akka.serialization.JavaSerializer
@ -131,7 +131,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def wrap[T](f: Promise[Actor] T): T = {
val result = Promise[Actor]()
val r = f(result)
Await.result(result, 1 minute)
Await.result(result.future, 1 minute)
r
}

View file

@ -14,7 +14,7 @@ import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
import akka.pattern.ask
import akka.util.Timeout
import akka.dispatch.Future
import scala.concurrent.Future
class JavaExtensionSpec extends JavaExtension with JUnitSuite

View file

@ -7,12 +7,12 @@ package akka.actor
import language.postfixOps
import akka.util.ByteString
import scala.concurrent.{ ExecutionContext, Await }
import scala.concurrent.{ ExecutionContext, Await, Future, Promise }
import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.util.duration._
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Future, Promise, MessageDispatcher }
import akka.dispatch.MessageDispatcher
import java.net.{ SocketAddress }
import akka.pattern.ask
@ -246,7 +246,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
*/
def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = Some(100 millis), filter: Option[Throwable Boolean] = None)(future: Future[T])(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()(executor)
val promise = Promise[T]()
val timer: Option[Deadline] = timeout match {
case Some(duration) Some(duration fromNow)
@ -271,7 +271,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
run(0)
promise
promise.future
}
"an IO Actor" must {
@ -279,7 +279,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
filterException[java.net.ConnectException] {
val addressPromise = Promise[SocketAddress]()
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout)
val client = system.actorOf(Props(new SimpleEchoClient(address)))
val f1 = retry() { client ? ByteString("Hello World!1") }
val f2 = retry() { client ? ByteString("Hello World!2") }
@ -296,7 +296,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
filterException[java.net.ConnectException] {
val addressPromise = Promise[SocketAddress]()
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout)
val client = system.actorOf(Props(new SimpleEchoClient(address)))
val list = List.range(0, 100)
val f = Future.traverse(list)(i retry() { client ? ByteString(i.toString) })
@ -310,7 +310,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
filterException[java.net.ConnectException] {
val addressPromise = Promise[SocketAddress]()
val server = system.actorOf(Props(new KVStore(addressPromise)))
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
val address = Await.result(addressPromise.future, TestLatch.DefaultTimeout)
val client1 = system.actorOf(Props(new KVClient(address)))
val client2 = system.actorOf(Props(new KVClient(address)))
val f1 = retry() { client1 ? KVSet("hello", "World") }

View file

@ -10,7 +10,7 @@ import akka.testkit._
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.util.Timeout
import akka.dispatch.Future
import scala.concurrent.Future
object LocalActorRefProviderSpec {
val config = """

View file

@ -7,16 +7,15 @@ import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.serialization.SerializationExtension
import akka.japi.{ Creator, Option JOption }
import akka.japi.{ Option JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Dispatchers, Future, Promise }
import akka.dispatch.{ Dispatchers }
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.actor.TypedActor._
@ -110,7 +109,7 @@ object TypedActorSpec {
def pigdog = "Pigdog"
def futurePigdog(): Future[String] = Promise.successful(pigdog)
def futurePigdog(): Future[String] = Promise.successful(pigdog).future
def futurePigdog(delay: Long): Future[String] = {
Thread.sleep(delay)
@ -119,7 +118,7 @@ object TypedActorSpec {
def futurePigdog(delay: Long, numbered: Int): Future[String] = {
Thread.sleep(delay)
Promise.successful(pigdog + numbered)
Promise.successful(pigdog + numbered).future
}
def futureComposePigdogFrom(foo: Foo): Future[String] = {

View file

@ -23,7 +23,7 @@ import akka.testkit._
import akka.util.{ Timeout, Switch }
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import scala.concurrent.Await
import scala.concurrent.{ Await, Future, Promise }
import scala.annotation.tailrec
object ActorModelSpec {
@ -413,9 +413,9 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)) }
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future }
val f4 = a ? Reply("foo2")
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)) }
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future }
val f6 = a ? Reply("bar2")
assert(Await.result(f1, timeout.duration) === "foo")

View file

@ -6,7 +6,7 @@ package akka.dataflow
import language.postfixOps
import akka.actor.{ Actor, Props }
import akka.dispatch.Future
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.util.duration._
import akka.testkit.{ AkkaSpec, DefaultTimeout }

View file

@ -10,7 +10,7 @@ import org.scalacheck.Prop._
import org.scalacheck.Gen._
import akka.actor._
import akka.testkit.{ EventFilter, filterEvents, filterException, AkkaSpec, DefaultTimeout, TestLatch }
import scala.concurrent.Await
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.util.duration._
import scala.concurrent.ExecutionContext
import org.scalatest.junit.JUnitSuite
@ -48,12 +48,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"A Promise" when {
"never completed" must {
behave like emptyFuture(_(Promise()))
behave like emptyFuture(_(Promise().future))
"return supplied value on timeout" in {
val failure = Promise.failed[String](new RuntimeException("br0ken"))
val otherFailure = Promise.failed[String](new RuntimeException("last"))
val empty = Promise[String]()
val timedOut = Promise.successful[String]("Timedout")
val failure = Promise.failed[String](new RuntimeException("br0ken")).future
val otherFailure = Promise.failed[String](new RuntimeException("last")).future
val empty = Promise[String]().future
val timedOut = Promise.successful[String]("Timedout").future
Await.result(failure fallbackTo timedOut, timeout.duration) must be("Timedout")
Await.result(timedOut fallbackTo empty, timeout.duration) must be("Timedout")
@ -65,22 +65,22 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"completed with a result" must {
val result = "test value"
val future = Promise[String]().complete(Right(result))
val future = Promise[String]().complete(Right(result)).future
behave like futureWithResult(_(future, result))
}
"completed with an exception" must {
val message = "Expected Exception"
val future = Promise[String]().complete(Left(new RuntimeException(message)))
val future = Promise[String]().complete(Left(new RuntimeException(message))).future
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with an InterruptedException" must {
val message = "Boxed InterruptedException"
val future = Promise[String]().complete(Left(new InterruptedException(message)))
val future = Promise[String]().complete(Left(new InterruptedException(message))).future
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with a NonLocalReturnControl" must {
val result = "test value"
val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result)))
val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future
behave like futureWithResult(_(future, result))
}
@ -94,11 +94,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val B = namedCtx("B")
// create a promise with ctx A
val p = Promise[String]()(A)
val p = Promise[String]()
// I would expect that any callback from p
// is executed in the context of p
val result = p map { _ + Thread.currentThread().getName() }
val result = p.future map { _ + Thread.currentThread().getName() }
p.completeWith(Future { "Hi " }(B))
try {
@ -332,15 +332,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"recoverWith from exceptions" in {
val o = new IllegalStateException("original")
val r = new IllegalStateException("recovered")
val yay = Promise.successful("yay!").future
intercept[IllegalStateException] {
Await.result(Promise.failed[String](o) recoverWith { case _ if false == true Promise.successful("yay!") }, timeout.duration)
Await.result(Promise.failed[String](o).future recoverWith { case _ if false == true yay }, timeout.duration)
} must be(o)
Await.result(Promise.failed[String](o) recoverWith { case _ Promise.successful("yay!") }, timeout.duration) must equal("yay!")
Await.result(Promise.failed[String](o).future recoverWith { case _ yay }, timeout.duration) must equal("yay!")
intercept[IllegalStateException] {
Await.result(Promise.failed[String](o) recoverWith { case _ Promise.failed[String](r) }, timeout.duration)
Await.result(Promise.failed[String](o).future recoverWith { case _ Promise.failed[String](r).future }, timeout.duration)
} must be(r)
}
@ -356,7 +357,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5)
val futures = Vector.fill[Future[Int]](10)(Promise[Int]().future) :+ Promise.successful[Int](5).future
Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5)
}
@ -384,18 +385,18 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val timeout = 10000 millis
val f = new IllegalStateException("test")
intercept[IllegalStateException] {
Await.result(Promise.failed[String](f) zip Promise.successful("foo"), timeout)
Await.result(Promise.failed[String](f).future zip Promise.successful("foo").future, timeout)
} must be(f)
intercept[IllegalStateException] {
Await.result(Promise.successful("foo") zip Promise.failed[String](f), timeout)
Await.result(Promise.successful("foo").future zip Promise.failed[String](f).future, timeout)
} must be(f)
intercept[IllegalStateException] {
Await.result(Promise.failed[String](f) zip Promise.failed[String](f), timeout)
Await.result(Promise.failed[String](f).future zip Promise.failed[String](f).future, timeout)
} must be(f)
Await.result(Promise.successful("foo") zip Promise.successful("foo"), timeout) must be(("foo", "foo"))
Await.result(Promise.successful("foo").future zip Promise.successful("foo").future, timeout) must be(("foo", "foo"))
}
"fold by composing" in {
@ -542,7 +543,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[TimeoutException] { intercept[TimeoutException] { Await.ready(f3, 0 millis) } }
}
"futureComposingWithContinuations" in {
//FIXME DATAFLOW
/*"futureComposingWithContinuations" in {
import Future.flow
val actor = system.actorOf(Props[TestActor])
@ -835,7 +837,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f3 must be('completed)
val p1 = Promise[String]()
val f4 = p1 map { s latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length }
val f4 = p1.future map { s latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length }
f4 foreach (_ latch(9).open())
p1 must not be ('completed)
@ -860,7 +862,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val l1, l2 = new TestLatch
val complex = Future() map { _
Future.blocking()
scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(implicitly[ExecutionContext])
val nested = Future(())
nested foreach (_ l1.open())
Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
@ -870,11 +872,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Await.ready(complex, timeout.duration) must be('completed)
}
"should capture first exception with dataflow" in {
//FIXME DATAFLOW
/*"should capture first exception with dataflow" in {
import Future.flow
val f1 = flow { 40 / 0 }
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
}
}*/
}
}
@ -899,7 +902,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"transform result with map" in { f((future, result) Await.result((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) }
"compose result with flatMap" in {
f { (future, result)
val r = for (r future; p Promise.successful("foo")) yield r.toString + p
val r = for (r future; p Promise.successful("foo").future) yield r.toString + p
Await.result(r, timeout.duration) must be(result.toString + "foo")
}
}
@ -907,13 +910,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f { (future, result)
val p = Promise[Any]()
future foreach p.success
Await.result(p, timeout.duration) must be(result)
Await.result(p.future, timeout.duration) must be(result)
}
}
"zip properly" in {
f { (future, result)
Await.result(future zip Promise.successful("foo"), timeout.duration) must be((result, "foo"))
(evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")), timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes")
Await.result(future zip Promise.successful("foo").future, timeout.duration) must be((result, "foo"))
(evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes")
}
}
"not recover from exception" in { f((future, result) Await.result(future.recover({ case _ "pigdog" }), timeout.duration) must be(result)) }
@ -921,7 +924,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f { (future, result)
val p = Promise[Any]()
future.onSuccess { case x p.success(x) }
Await.result(p, timeout.duration) must be(result)
Await.result(p.future, timeout.duration) must be(result)
}
}
"not project a failure" in { f((future, result) (evaluating { Await.result(future.failed, timeout.duration) } must produce[NoSuchElementException]).getMessage must be("Future.failed not completed with a throwable. Instead completed with: " + result)) }
@ -947,11 +950,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
}
"retain exception with map" in { f((future, message) (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with flatMap" in { f((future, message) (evaluating { Await.result(future flatMap (_ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with flatMap" in { f((future, message) (evaluating { Await.result(future flatMap (_ Promise.successful[Any]("foo").future), timeout.duration) } must produce[E]).getMessage must be(message)) }
"not perform action with foreach" is pending
"zip properly" in {
f { (future, message) (evaluating { Await.result(future zip Promise.successful("foo"), timeout.duration) } must produce[E]).getMessage must be(message) }
f { (future, message) (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } must produce[E]).getMessage must be(message) }
}
"recover from exception" in { f((future, message) Await.result(future.recover({ case e if e.getMessage == message "pigdog" }), timeout.duration) must be("pigdog")) }
"not perform action on result" is pending
@ -960,7 +963,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f { (future, message)
val p = Promise[Any]()
future.onFailure { case _ p.success(message) }
Await.result(p, timeout.duration) must be(message)
Await.result(p.future, timeout.duration) must be(message)
}
}
"always cast successfully using mapTo" in { f((future, message) (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) }

View file

@ -10,7 +10,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import com.typesafe.config.Config
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
import akka.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.{ Future, Promise, Await }
import scala.concurrent.util.duration.intToDurationInt
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -76,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
}
})
t.start
result
result.future
}
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters, system)

View file

@ -6,8 +6,7 @@ package akka.pattern
import akka.testkit._
import scala.concurrent.util.duration._
import org.scalatest.BeforeAndAfter
import akka.dispatch.{ Promise, Future }
import scala.concurrent.Await
import scala.concurrent.{ Promise, Future, Await }
class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
@ -37,7 +36,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
def openBreaker: Unit = {
for (i 1 to 5)
Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith {
case _ Promise.successful("OK")
case _ Promise.successful("OK").future
}, 1.second.dilated)
}
@ -62,9 +61,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
val futures = for (i 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("success")
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO")
}
}) recoverWith { case _: CircuitBreakerOpenException Promise.successful("CBO").future }
val futureList = Future.sequence(futures)
@ -82,9 +79,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
val futures = for (i 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("succeed")
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO")
}
}) recoverWith { case _: CircuitBreakerOpenException Promise.successful("CBO").future }
val futureList = Future.sequence(futures)
@ -106,7 +101,7 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
val futures = for (i 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("succeed")
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO")
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
val futureList = Future.sequence(futures)

View file

@ -9,7 +9,7 @@ import language.postfixOps
import scala.concurrent.util.duration._
import akka.testkit._
import org.scalatest.BeforeAndAfter
import akka.dispatch.Future
import scala.concurrent.Future
import scala.concurrent.Await
object CircuitBreakerSpec {

View file

@ -8,10 +8,9 @@ import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.{ Props, Actor }
import scala.concurrent.Await
import scala.concurrent.{ Future, Promise, Await }
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import akka.dispatch.{ Future, Promise }
object PatternSpec {
case class Work(duration: Duration)
@ -50,16 +49,16 @@ class PatternSpec extends AkkaSpec {
"pattern.after" must {
"be completed successfully eventually" in {
val f = after(1 second, using = system.scheduler)(Promise.successful(5))
val f = after(1 second, using = system.scheduler)(Promise.successful(5).future)
val r = Future.firstCompletedOf(Seq(Promise[Int](), f))
val r = Future.firstCompletedOf(Seq(Promise[Int]().future, f))
Await.result(r, remaining) must be(5)
}
"be completed abnormally eventually" in {
val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico")))
val f = after(1 second, using = system.scheduler)(Promise.failed(new IllegalStateException("Mexico")).future)
val r = Future.firstCompletedOf(Seq(Promise[Int](), f))
val r = Future.firstCompletedOf(Seq(Promise[Int]().future, f))
intercept[IllegalStateException] { Await.result(r, remaining) }.getMessage must be("Mexico")
}
}

View file

@ -4,7 +4,7 @@
package akka.util
import org.scalatest.matchers.MustMatchers
import akka.dispatch.Future
import scala.concurrent.Future
import akka.testkit.AkkaSpec
import scala.concurrent.Await
import scala.util.Random

View file

@ -1,29 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch;
import akka.util.Unsafe;
abstract class AbstractPromise {
private volatile Object _ref = DefaultPromise.EmptyPending();
final static long _refOffset; // Memory offset to _ref field
static {
try {
_refOffset = Unsafe.instance.objectFieldOffset(AbstractPromise.class.getDeclaredField("_ref"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
protected final boolean updateState(Object oldState, Object newState) {
return Unsafe.instance.compareAndSwapObject(this, _refOffset, oldState, newState);
}
protected final Object getState() {
return _ref;
}
}

View file

@ -4,12 +4,12 @@
package akka.actor
import java.util.concurrent.atomic.AtomicLong
import akka.dispatch._
import akka.routing._
import akka.AkkaException
import akka.event._
import akka.util.{ NonFatal, Switch, Helpers }
import scala.concurrent.{ Future, Promise }
import java.util.concurrent.atomic.AtomicLong
/**
* Interface for all ActorRef providers to implement.
@ -361,9 +361,7 @@ class LocalActorRefProvider(
def provider: ActorRefProvider = LocalActorRefProvider.this
override def stop(): Unit = stopped switchOn {
terminationFuture.complete(causeOfTermination.toLeft(()))
}
override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.toLeft(())) }
override def isTerminated: Boolean = stopped.isOn
@ -458,7 +456,9 @@ class LocalActorRefProvider(
def dispatcher: MessageDispatcher = system.dispatcher
lazy val terminationFuture: Promise[Unit] = Promise[Unit]()(dispatcher)
lazy val terminationPromise: Promise[Unit] = Promise[Unit]()
def terminationFuture: Future[Unit] = terminationPromise.future
@volatile
private var extraNames: Map[String, InternalActorRef] = Map()

View file

@ -11,7 +11,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.concurrent.util.Duration
import java.io.Closeable
import scala.concurrent.{ Await, Awaitable, CanAwait }
import scala.concurrent.{ Await, Awaitable, CanAwait, Future }
import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }

View file

@ -6,8 +6,7 @@ package akka.actor
import language.higherKinds
import language.postfixOps
import akka.dispatch.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.util.Duration
import akka.util.{ ByteString, NonFatal }
import java.net.{ SocketAddress, InetSocketAddress }
@ -564,7 +563,7 @@ object IO {
* A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe.
*
* Designed for use within an [[akka.actor.Actor]], although all actions
* perfomed on the Iteratee are processed within a [[akka.dispatch.Future]]
* perfomed on the Iteratee are processed within a [[scala.concurrent.Future]]
* so it is not safe to refer to the Actor's state from within this Iteratee.
* Messages should instead be sent to the Actor in order to modify state.
*

View file

@ -9,10 +9,10 @@ import akka.japi.{ Creator, Option ⇒ JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Timeout, NonFatal }
import scala.concurrent.util.Duration
import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import akka.util.Reflect.instantiator
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.dispatch._
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.reflect.ClassTag

View file

@ -4,29 +4,12 @@
package akka.dispatch
import language.implicitConversions
import language.postfixOps
import language.higherKinds
import scala.runtime.{ BoxedUnit, AbstractPartialFunction }
import akka.japi.{ Function JFunc, Option JOption }
import scala.util.continuations._
import scala.reflect.ClassTag
import scala.concurrent.{ Future, Promise, ExecutionContext }
import java.lang.{ Iterable JIterable }
import java.util.{ LinkedList JLinkedList }
import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.BoxedType
import akka.util.NonFatal
import akka.event.Logging.{ LogEventException, Debug, Error }
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger }
import akka.pattern.AskTimeoutException
import scala.util.DynamicVariable
import scala.concurrent.util.Duration
import scala.concurrent.ExecutionContext
import scala.runtime.{ BoxedUnit, AbstractPartialFunction }
import scala.concurrent.{ Awaitable, Await, CanAwait }
/**
* Futures is the Java API for Futures and Promises
@ -41,17 +24,17 @@ object Futures {
/**
* Java API, equivalent to Promise.apply
*/
def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor)
def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()
/**
* Java API, creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor)
def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)
/**
* Java API, Creates an already completed Promise with the specified result
*/
def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor)
def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)
/**
* Java API.
@ -131,767 +114,7 @@ object Futures {
* Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method.
*
*/
def blocking(): Unit = Future.blocking()
}
object Future {
/**
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T)(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()
executor.execute(new Runnable {
def run =
promise complete {
try {
Right(body)
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
Left(e)
}
}
})
promise
}
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
/**
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val futureResult = Promise[T]()
val completeFirst: Either[Throwable, T] Unit = futureResult tryComplete _
futures.foreach(_ onComplete completeFirst)
futureResult
}
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
if (futures.isEmpty) Promise.successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
val search: Either[Throwable, T] Unit = v try {
v match {
case Right(r) if (predicate(r)) result tryComplete Right(Some(r))
case _
}
} finally {
if (ref.decrementAndGet == 0)
result tryComplete Right(None)
}
futures.foreach(_ onComplete search)
result
}
}
/**
* A non-blocking fold over the specified futures, with the start value of the given zero.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
* Example:
* <pre>
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* </pre>
*/
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise.successful(zero)
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
* <pre>
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
* </pre>
*/
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduceLeft op)
}
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
* <pre>
* val myFutureList = Future.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
}.map(_.result)
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
*
* Within the block, the result of a Future may be accessed by calling Future.apply. At that point
* execution is suspended with the rest of the block being stored in a continuation until the result
* of the Future is available. If an Exception is thrown while processing, it will be contained
* within the resulting Future.
*
* This allows working with Futures in an imperative style without blocking for each result.
*
* Completing a Future using 'Promise << Future' will also suspend execution until the
* value of the other Future is available.
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val p = Promise[A]
dispatchTask({ ()
try {
(reify(body) foreachFull (p success, p failure): Future[Any]) onFailure {
case NonFatal(e) p tryComplete Left(e)
}
} catch {
case NonFatal(e) p tryComplete Left(e)
}
}, true)
p.future
}
/**
* Signals that the current thread of execution will potentially engage
* an action that will take a non-trivial amount of time, perhaps by using blocking.IO or using a lot of CPU time,
* giving the system a chance to spawn new threads, reuse old threads or otherwise,
* to prevent starvation and/or unfairness.
*
* Assures that any Future tasks initiated in the current thread will be
* executed asynchronously, including any tasks currently queued to be
* executed in the current thread. This is needed if the current task may
* block, causing delays in executing the remaining tasks which in some
* cases may cause a deadlock.
*
* Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method.
*
* For example, in the following block of code the call to 'latch.open'
* might not be executed until after the call to 'latch.await', causing
* a deadlock. By adding 'Future.blocking()' the call to 'latch.open'
* will instead be dispatched separately from the current block, allowing
* it to be run in parallel:
* <pre>
* val latch = new StandardLatch
* val future = Future() map { _
* Future.blocking()
* val nested = Future()
* nested foreach (_ latch.open)
* latch.await
* }
* </pre>
*/
def blocking(): Unit =
_taskStack.get match {
case stack if (stack ne null) && stack.nonEmpty
val executionContext = _executionContext.value match {
case null throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.")
case some some
}
val tasks = stack.elems
stack.clear()
_taskStack.remove()
dispatchTask(() _taskStack.get.elems = tasks, true)(executionContext)
case _ _taskStack.remove()
}
private val _taskStack = new ThreadLocal[Stack[() Unit]]()
private val _executionContext = new DynamicVariable[ExecutionContext](null)
/**
* Internal API, do not call
*/
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case stack if (stack ne null) && (executor eq _executionContext.value) && !force stack push task
case _ executor.execute(
new Runnable {
def run =
try {
_executionContext.withValue(executor) {
val taskStack = Stack.empty[() Unit]
taskStack push task
_taskStack set taskStack
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case NonFatal(e) executor.reportFailure(e)
}
}
}
} finally {
_taskStack.remove()
}
})
}
}
/**
* Trait representing a value that may not have been computed yet.
*
* @define asyncCallbackWarning
*
* Note: the callback function may (and probably will) run in another thread,
* and therefore should not refer to any unsynchronized state. In
* particular, if using this method from an actor, do not access
* the state of the actor from the callback function.
* [[akka.dispatch.Promise]].`completeWith`,
* [[akka.pattern.PipeToSupport.PipeableFuture]].`pipeTo`,
* and [[akka.dispatch.Future]].`fallbackTo` are some methods to consider
* using when possible, to avoid concurrent callbacks.
*/
sealed trait Future[+T] extends Awaitable[T] {
protected implicit def executor: ExecutionContext
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
case Left(t: InterruptedException) Left(new RuntimeException("Boxed InterruptedException", t))
case _ source
}
/**
* @return a new Future that will contain a tuple containing the successful result of this and that Future.
* If this or that fail, they will race to complete the returned Future with their failure.
* The returned Future will not be completed if neither this nor that are completed.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
val p = Promise[(T, U)]()
onComplete {
case Left(t) p failure t
case Right(r) that onSuccess { case r2 p success ((r, r2)) }
}
that onFailure { case f p tryComplete Left(f) }
p.future
}
/**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T Future[Any]))
/**
* Tests whether this Future has been completed.
*/
def isCompleted: Boolean
/**
* The contained value of this Future. Before this Future is completed
* the value will be None. After completion the value will be Some(Right(t))
* if it contains a valid result, or Some(Left(error)) if it contains
* an exception.
*/
def value: Option[Either[Throwable, T]]
/**
* When this Future is completed, apply the provided function to the
* Future. If the Future has already been completed, this will apply
* immediately. Multiple
* callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
*
* $asyncCallbackWarning
*/
def onComplete[U](func: Either[Throwable, T] U): this.type
/**
* When the future is completed with a valid result, apply the provided
* PartialFunction to the result. See `onComplete` for more details.
* <pre>
* future onSuccess {
* case Foo target ! "foo"
* case Bar target ! "bar"
* }
* </pre>
*
* $asyncCallbackWarning
*/
final def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
case Right(r) if pf isDefinedAt r pf(r)
case _
}
/**
* When the future is completed with an exception, apply the provided
* PartialFunction to the exception. See `onComplete` for more details.
* <pre>
* future onFailure {
* case NumberFormatException target ! "wrong format"
* }
* </pre>
*
* $asyncCallbackWarning
*/
final def onFailure[U](pf: PartialFunction[Throwable, U]): this.type = onComplete {
case Left(ex) if pf isDefinedAt ex pf(ex)
case _
}
/**
* Returns a failure projection of this Future
* If `this` becomes completed with a failure, that failure will be the success of the returned Future
* If `this` becomes completed with a result, then the returned future will fail with a NoSuchElementException
*/
final def failed: Future[Throwable] = {
val p = Promise[Throwable]()
this.onComplete {
case Left(t) p success t
case Right(r) p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r)
}
p.future
}
/**
* Returns a new Future that will either hold the successful value of this Future,
* or, it this Future fails, it will hold the result of "that" Future.
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
val p = Promise[U]()
onComplete {
case r @ Right(_) p complete r
case _ p completeWith that
}
p.future
}
/**
* Creates a new Future that will handle any matching Throwable that this
* Future might contain. If there is no match, or if this Future contains
* a valid result then the new Future will contain the same.
* Example:
* <pre>
* Future(6 / 0) recover { case e: ArithmeticException 0 } // result: 0
* Future(6 / 0) recover { case e: NotFoundException 0 } // result: exception
* Future(6 / 2) recover { case e: ArithmeticException 0 } // result: 3
* </pre>
*
* $asyncCallbackWarning
*/
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val p = Promise[A]()
onComplete {
case Left(e) if pf isDefinedAt e p.complete(try { Right(pf(e)) } catch { case NonFatal(x) Left(x) })
case otherwise p complete otherwise
}
p.future
}
/**
* Returns a new Future that will, in case this future fails,
* be completed with the resulting Future of the given PartialFunction,
* if the given PartialFunction matches the failure of the original Future.
*
* If the PartialFunction throws, that Throwable will be propagated to the returned Future.
*
* Example:
*
* {{{
* val f = Future { Int.MaxValue }
* Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*
* $asyncCallbackWarning
*/
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
val p = Promise[U]()
onComplete {
case Left(t) if pf isDefinedAt t
try { p completeWith pf(t) } catch { case NonFatal(t) p complete resolve(Left(t)) }
case otherwise p complete otherwise
}
p.future
}
/**
* Returns a new Future that will contain the completed result of this Future,
* and which will invoke the supplied PartialFunction when completed.
*
* This allows for establishing order of side-effects.
*
* {{{
* Future { 5 } andThen {
* case something => assert(something is awesome)
* } andThen {
* case Left(t) => handleProblem(t)
* case Right(v) => dealWithSuccess(v)
* }
* }}}
*
* $asyncCallbackWarning
*/
def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
val p = Promise[T]()
onComplete { case r try if (pf isDefinedAt r) pf(r) finally p complete r }
p.future
}
/**
* Creates a new Future by applying a function to the successful result of
* this Future. If this Future is completed with an exception then the new
* Future will also contain this exception.
* Example:
* <pre>
* val future1 = for {
* a: Int <- actor ? "Hello" // returns 5
* b: String <- actor ? a // returns "10"
* c: String <- actor ? 7 // returns "14"
* } yield b + "-" + c
* </pre>
*
* $asyncCallbackWarning
*/
final def map[A](f: T A): Future[A] = {
val future = Promise[A]()
onComplete {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, A]]
case Right(res)
future complete (try {
Right(f(res))
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
Left(e)
})
}
future
}
/**
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*
* When used from Java, to create the ClassTag, use:
* import static akka.japi.Util.classTag;
* future.mapTo(classTag(MyClass.class));
*/
final def mapTo[A](implicit m: ClassTag[A]): Future[A] = {
val fa = Promise[A]()
onComplete {
case l: Left[_, _] fa complete l.asInstanceOf[Either[Throwable, A]]
case Right(t)
fa complete (try {
Right(BoxedType(m.runtimeClass).cast(t).asInstanceOf[A])
} catch {
case e: ClassCastException Left(e)
})
}
fa.future
}
/**
* Creates a new Future by applying a function to the successful result of
* this Future, and returns the result of the function as the new Future.
* If this Future is completed with an exception then the new Future will
* also contain this exception.
* Example:
* <pre>
* val future1 = for {
* a: Int <- actor ? "Hello" // returns 5
* b: String <- actor ? a // returns "10"
* c: String <- actor ? 7 // returns "14"
* } yield b + "-" + c
* </pre>
*
* $asyncCallbackWarning
*/
final def flatMap[A](f: T Future[A]): Future[A] = {
val p = Promise[A]()
onComplete {
case l: Left[_, _] p complete l.asInstanceOf[Either[Throwable, A]]
case Right(r)
try {
p completeWith f(r)
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
p complete Left(e)
case t: Throwable
p complete Left(new ExecutionException(t)); throw t
}
}
p.future
}
/**
* Same as onSuccess { case r => f(r) } but is also used in for-comprehensions
*
* $asyncCallbackWarning
*/
final def foreach[U](f: T U): Unit = onComplete {
case Right(r) f(r)
case _
}
/**
* Used by for-comprehensions
*
* $asyncCallbackWarning
*/
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean) {
def foreach(f: A Unit): Unit = self filter p foreach f
def map[B](f: A B): Future[B] = self filter p map f
def flatMap[B](f: A Future[B]): Future[B] = self filter p flatMap f
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
/**
* Returns a new Future that will hold the successful result of this Future if it matches
* the given predicate, if it doesn't match, the resulting Future will be a failed Future
* with a MatchError, of if this Future fails, that failure will be propagated to the returned Future
*
* $asyncCallbackWarning
*/
final def filter(pred: T Boolean): Future[T] = {
val p = Promise[T]()
onComplete {
case l: Left[_, _] p complete l.asInstanceOf[Either[Throwable, T]]
case r @ Right(res) p complete (try {
if (pred(res)) r else Left(new MatchError(res))
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
Left(e)
})
}
p.future
}
}
object Promise {
/**
* Creates a non-completed Promise
*
* Scala API
*/
def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]()
/**
* Creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception))
/**
* Creates an already completed Promise with the specified result
*/
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result))
}
/**
* Essentially this is the Promise (or write-side) of a Future (read-side).
*/
trait Promise[T] extends Future[T] {
/**
* Returns the Future associated with this Promise
*/
def future: Future[T] = this
/**
* Completes this Promise with the specified result, if not already completed.
* @return whether this call completed the Promise
*/
def tryComplete(value: Either[Throwable, T]): Boolean
/**
* Completes this Promise with the specified result, if not already completed.
* @throws IllegalStateException if already completed, this is to aid in debugging of complete-races,
* use tryComplete to do a conditional complete.
* @return this
*/
final def complete(value: Either[Throwable, T]): this.type =
if (tryComplete(value)) this else throw new IllegalStateException("Promise already completed: " + this + " tried to complete with " + value)
/**
* Completes this Promise with the specified result, if not already completed.
* @return this
*/
final def success(result: T): this.type = complete(Right(result))
/**
* Completes this Promise with the specified exception, if not already completed.
* @return this
*/
final def failure(exception: Throwable): this.type = complete(Left(exception))
/**
* Completes this Promise with the specified other Future, when that Future is completed,
* unless this Promise has already been completed.
* @return this.
*/
final def completeWith(other: Future[T]): this.type = {
other onComplete { tryComplete(_) }
this
}
final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any]) cont(complete(Right(value))) }
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any]()
val thisPromise = this
thisPromise completeWith other onComplete { v
try {
fr completeWith cont(thisPromise)
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
fr failure e
}
}
fr
}
}
//Companion object to FState, just to provide a cheap, immutable default entry
private[dispatch] object DefaultPromise {
def EmptyPending[T](): List[T] = Nil
}
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self
protected final def tryAwait(atMost: Duration): Boolean = {
Future.blocking
@tailrec
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (!isCompleted && waitTimeNanos > 0) {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = System.nanoTime()
try { synchronized { if (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException }
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else isCompleted
}
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
@throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (isCompleted || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost + "]")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e: AskTimeoutException) throw new AskTimeoutException(e.getMessage, e) // to get meaningful stack trace
case Left(e) throw e
case Right(r) r
}
def value: Option[Either[Throwable, T]] = getState match {
case _: List[_] None
case c: Either[_, _] Some(c.asInstanceOf[Either[Throwable, T]])
}
def isCompleted(): Boolean = getState match {
case _: Either[_, _] true
case _ false
}
def tryComplete(value: Either[Throwable, T]): Boolean = {
val callbacks: List[Either[Throwable, T] Unit] = {
try {
@tailrec
def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] Unit] = {
getState match {
case raw: List[_]
val cur = raw.asInstanceOf[List[Either[Throwable, T] Unit]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ null
}
}
tryComplete(resolve(value))
} finally {
synchronized { notifyAll() } //Notify any evil blockers
}
}
callbacks match {
case null false
case cs if cs.isEmpty true
case cs Future.dispatchTask(() cs.foreach(f notifyCompleted(f, value))); true
}
}
def onComplete[U](func: Either[Throwable, T] U): this.type = {
@tailrec //Returns whether the future has already been completed or not
def tryAddCallback(): Either[Throwable, T] = {
val cur = getState
cur match {
case r: Either[_, _] r.asInstanceOf[Either[Throwable, T]]
case listeners: List[_] if (updateState(listeners, func :: listeners)) null else tryAddCallback()
}
}
tryAddCallback() match {
case null this
case completed
Future.dispatchTask(() notifyCompleted(func, completed))
this
}
}
private final def notifyCompleted[U](func: Either[Throwable, T] U, result: Either[Throwable, T]): Unit =
try func(result) catch { case NonFatal(e) executor reportFailure e }
}
/**
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
val value = Some(resolve(suppliedValue))
def tryComplete(value: Either[Throwable, T]): Boolean = false
def onComplete[U](func: Either[Throwable, T] U): this.type = {
val completedAs = value.get
Future dispatchTask (() func(completedAs))
this
}
def isCompleted(): Boolean = true
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case Left(e) throw e
case Right(r) r
}
def blocking(): Unit = scala.concurrent.impl.InternalFutureUtil.releaseFutureStack(ExecutionContext.defaultExecutionContext) //FIXME NOT CORRECT EC
}
/**

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException
import annotation.tailrec
import akka.actor._
import akka.dispatch._
import scala.concurrent.{ Future, Promise }
import akka.util.{ NonFatal, Timeout, Unsafe }
/**
@ -43,7 +44,7 @@ trait AskSupport {
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
@ -68,21 +69,21 @@ trait AskSupport {
* } pipeTo nextActor
* }}}
*
* [see [[akka.dispatch.Future]] for a description of `flow`]
* [see [[scala.concurrent.Future]] for a description of `flow`]
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated
actorRef.tell(message)
Promise.failed(new AskTimeoutException("sending to terminated ref breaks promises"))(ref.provider.dispatcher)
Promise.failed[Any](new AskTimeoutException("sending to terminated ref breaks promises")).future
case ref: InternalActorRef
val provider = ref.provider
if (timeout.duration.length <= 0) {
actorRef.tell(message)
Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher)
Promise.failed[Any](new AskTimeoutException("not asking with negative timeout")).future
} else {
val a = PromiseActorRef(provider, timeout)
actorRef.tell(message, a)
a.result
a.result.future
}
case _ throw new IllegalArgumentException("incompatible ActorRef " + actorRef)
}
@ -93,7 +94,7 @@ trait AskSupport {
private[akka] final class AskableActorRef(val actorRef: ActorRef) {
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
@ -118,12 +119,12 @@ trait AskSupport {
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
* [see the [[scala.concurrent.Future]] companion object for a description of `flow`]
*/
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
/**
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
@ -148,7 +149,7 @@ trait AskSupport {
* } pipeTo nextActor
* }}}
*
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
* [see the [[scala.concurrent.Future]] companion object for a description of `flow`]
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
}
@ -277,7 +278,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@tailrec
override def stop(): Unit = {
def ensureCompleted(): Unit = {
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
result.tryComplete(Left(new ActorKilledException("Stopped")))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(existenceConfirmed = true)
@ -304,10 +305,10 @@ private[akka] object PromiseActorRef {
private case class StoppedWithPath(path: ActorPath)
def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
val result = Promise[Any]()(provider.dispatcher)
val result = Promise[Any]()
val a = new PromiseActorRef(provider, result)
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) }
result onComplete { _ try a.stop() finally f.cancel() }
result.future onComplete { _ try a.stop() finally f.cancel() }
a
}
}

View file

@ -6,14 +6,12 @@ package akka.pattern
import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean }
import akka.AkkaException
import akka.actor.Scheduler
import akka.dispatch.{ Future, Promise }
import akka.util.{ NonFatal, Unsafe }
import scala.concurrent.ExecutionContext
import scala.concurrent.util.duration._
import scala.concurrent.util.{ Duration, Deadline }
import util.control.NoStackTrace
import scala.util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
import scala.concurrent.{ Awaitable, Await, CanAwait }
import scala.concurrent.{ ExecutionContext, Future, Promise, Awaitable, Await, CanAwait }
import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.util.duration._
/**
* Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread
@ -111,22 +109,18 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
*
* @param body Call needing protected
* @tparam T return type from call
* @return [[akka.dispatch.Future]] containing the call result
* @return [[scala.concurrent.Future]] containing the call result
*/
def withCircuitBreaker[T](body: Future[T]): Future[T] = {
currentState.invoke(body)
}
def withCircuitBreaker[T](body: Future[T]): Future[T] = currentState.invoke(body)
/**
* Java API for withCircuitBreaker
*
* @param body Call needing protected
* @tparam T return type from call
* @return [[akka.dispatch.Future]] containing the call result
* @return [[scala.concurrent.Future]] containing the call result
*/
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = {
withCircuitBreaker(body.call)
}
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = withCircuitBreaker(body.call)
/**
* Wraps invocations of synchronous calls that need to be protected
@ -137,16 +131,10 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T return type from call
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: T): T = {
Await.result(withCircuitBreaker(
{
try
Promise.successful(body)(CircuitBreaker.syncExecutionContext)
catch {
case NonFatal(t) Promise.failed(t)(CircuitBreaker.syncExecutionContext)
}
}), callTimeout)
}
def withSyncCircuitBreaker[T](body: T): T =
Await.result(
withCircuitBreaker({ try Promise.successful(body) catch { case NonFatal(t) Promise.failed(t) } }.future),
callTimeout)
/**
* Java API for withSyncCircuitBreaker
@ -156,9 +144,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @return The result of the call
*/
def callWithSyncCircuitBreaker[T](body: Callable[T]): T = {
withSyncCircuitBreaker(body.call)
}
def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call)
/**
* Adds a callback to execute when circuit breaker opens
@ -181,9 +167,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onOpen[T](callback: Callable[T]): CircuitBreaker = {
onOpen(callback.call)
}
def onOpen[T](callback: Callable[T]): CircuitBreaker = onOpen(callback.call)
/**
* Adds a callback to execute when circuit breaker transitions to half-open
@ -206,9 +190,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = {
onHalfOpen(callback.call)
}
def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = onHalfOpen(callback.call)
/**
* Adds a callback to execute when circuit breaker state closes
@ -231,9 +213,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onClose[T](callback: Callable[T]): CircuitBreaker = {
onClose(callback.call)
}
def onClose[T](callback: Callable[T]): CircuitBreaker = onClose(callback.call)
/**
* Retrieves current failure count.
@ -249,37 +229,30 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @param toState State being transitioning from
* @throws IllegalStateException if an invalid transition is attempted
*/
private def transition(fromState: State, toState: State): Unit = {
private def transition(fromState: State, toState: State): Unit =
if (swapState(fromState, toState))
toState.enter()
else
throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState)
}
/**
* Trips breaker to an open state. This is valid from Closed or Half-Open states.
*
* @param fromState State we're coming from (Closed or Half-Open)
*/
private def tripBreaker(fromState: State): Unit = {
transition(fromState, Open)
}
private def tripBreaker(fromState: State): Unit = transition(fromState, Open)
/**
* Resets breaker to a closed state. This is valid from an Half-Open state only.
*
*/
private def resetBreaker(): Unit = {
transition(HalfOpen, Closed)
}
private def resetBreaker(): Unit = transition(HalfOpen, Closed)
/**
* Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only.
*
*/
private def attemptReset(): Unit = {
transition(Open, HalfOpen)
}
private def attemptReset(): Unit = transition(Open, HalfOpen)
/**
* Internal state abstraction
@ -293,9 +266,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @param listener listener implementation
* @tparam T return type of listener, not used - but supplied for type inference purposes
*/
def addListener[T](listener: () T) {
listeners add listener
}
def addListener[T](listener: () T): Unit = listeners add listener
/**
* Test for whether listeners exist
@ -330,16 +301,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
*/
def callThrough[T](body: Future[T]): Future[T] = {
val deadline = callTimeout.fromNow
val bodyFuture = try body catch {
case NonFatal(t) Promise.failed(t)
}
bodyFuture onFailure {
case _ callFails()
} onSuccess {
case _
if (deadline.isOverdue()) callFails()
else callSucceeds()
val bodyFuture = try body catch { case NonFatal(t) Promise.failed(t).future }
bodyFuture onComplete {
case Right(_) if !deadline.isOverdue() callSucceeds()
case _ callFails()
}
bodyFuture
}
/**
@ -392,16 +359,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
callThrough(body)
}
override def invoke[T](body: Future[T]): Future[T] = callThrough(body)
/**
* On successful call, the failure count is reset to 0
*
* @return
*/
override def callSucceeds(): Unit = { set(0) }
override def callSucceeds(): Unit = set(0)
/**
* On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
@ -409,27 +374,21 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
*
* @return
*/
override def callFails(): Unit = {
if (incrementAndGet() == maxFailures) tripBreaker(Closed)
}
override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)
/**
* On entry of this state, failure count is reset.
*
* @return
*/
override def _enter(): Unit = {
set(0)
}
override def _enter(): Unit = set(0)
/**
* Override for more descriptive toString
*
* @return
*/
override def toString: String = {
"Closed with failure count = " + get()
}
override def toString: String = "Closed with failure count = " + get()
}
/**
@ -445,44 +404,36 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
if (compareAndSet(true, false))
callThrough(body)
else
Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero))
}
override def invoke[T](body: Future[T]): Future[T] =
if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)).future
/**
* Reset breaker on successful call.
*
* @return
*/
override def callSucceeds(): Unit = { resetBreaker() }
override def callSucceeds(): Unit = resetBreaker()
/**
* Reopen breaker on failed call.
*
* @return
*/
override def callFails(): Unit = { tripBreaker(HalfOpen) }
override def callFails(): Unit = tripBreaker(HalfOpen)
/**
* On entry, guard should be reset for that first call to get in
*
* @return
*/
override def _enter(): Unit = {
set(true)
}
override def _enter(): Unit = set(true)
/**
* Override for more descriptive toString
*
* @return
*/
override def toString: String = {
"Half-Open currently testing call for success = " + get()
}
override def toString: String = "Half-Open currently testing call for success = " + get()
}
/**
@ -497,9 +448,8 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft))
}
override def invoke[T](body: Future[T]): Future[T] =
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)).future
/**
* Calculate remaining timeout to inform the caller in case a backoff algorithm is useful
@ -516,14 +466,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
*
* @return
*/
override def callSucceeds(): Unit = {}
override def callSucceeds(): Unit = ()
/**
* No-op for open, calls are never executed so cannot succeed or fail
*
* @return
*/
override def callFails(): Unit = {}
override def callFails(): Unit = ()
/**
* On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to
@ -543,9 +493,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
*
* @return
*/
override def toString: String = {
"Open"
}
override def toString: String = "Open"
}
}

View file

@ -5,20 +5,20 @@ package akka.pattern
*/
import scala.concurrent.util.Duration
import scala.concurrent.ExecutionContext
import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.actor._
import akka.dispatch.{ Promise, Future }
trait FutureTimeoutSupport {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, using: Scheduler)(value: Future[T])(implicit ec: ExecutionContext): Future[T] =
if (duration.isFinite() && duration.length < 1) value else {
val p = Promise[T]()
val c = using.scheduleOnce(duration) { p completeWith value }
p onComplete { _ c.cancel() }
p
val f = p.future
f onComplete { _ c.cancel() }
f
}
}

View file

@ -6,12 +6,13 @@ package akka.pattern
import akka.actor._
import akka.util.{ Timeout }
import akka.dispatch.{ Unwatch, Watch }
import scala.concurrent.{ Promise, Future }
import scala.concurrent.util.Duration
import akka.dispatch.{ Unwatch, Watch, Promise, Future }
trait GracefulStopSupport {
/**
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
* Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
@ -31,22 +32,23 @@ trait GracefulStopSupport {
* }
* }}}
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) Promise.successful(true)
if (target.isTerminated) Promise.successful(true).future
else system match {
case e: ExtendedActorSystem
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(e.provider, Timeout(timeout))
internalTarget.sendSystemMessage(Watch(target, ref))
ref.result onComplete { // Just making sure we're not leaking here
val f = ref.result.future
f onComplete { // Just making sure we're not leaking here
case Right(Terminated(`target`)) ()
case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
}
target ! PoisonPill
ref.result map {
f map {
case Terminated(`target`) true
case _ false
}

View file

@ -9,14 +9,14 @@ import java.util.concurrent.Callable
object Patterns {
import akka.actor.{ ActorRef, ActorSystem }
import akka.dispatch.Future
import akka.pattern.{ ask scalaAsk, pipe scalaPipe, gracefulStop scalaGracefulStop, after scalaAfter }
import akka.util.Timeout
import scala.concurrent.Future
import scala.concurrent.util.Duration
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
@ -47,7 +47,7 @@ object Patterns {
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* Sends a message asynchronously and returns a [[scala.concurrent.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
@ -77,7 +77,7 @@ object Patterns {
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]]
/**
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
* Register an onComplete callback on this [[scala.concurrent.Future]] to send
* the result to the given actor reference. Returns the original Future to
* allow method chaining.
*
@ -94,27 +94,27 @@ object Patterns {
def pipe[T](future: Future[T]): PipeableFuture[T] = scalaPipe(future)
/**
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
* Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided Callable
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] =
scalaAfter(duration, scheduler)(value.call())(context)
/**
* Returns a [[akka.dispatch.Future]] that will be completed with the success or failure of the provided value
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =

View file

@ -5,17 +5,19 @@ package akka.pattern
import language.implicitConversions
import akka.dispatch.Future
import scala.concurrent.{ Future }
import akka.actor.{ Status, ActorRef }
trait PipeToSupport {
final class PipeableFuture[T](val future: Future[T]) {
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] =
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] = {
future onComplete {
case Right(r) recipient ! r
case Left(f) recipient ! Status.Failure(f)
}
future
}
def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, null)
def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = {
pipeTo(recipient)(sender)
@ -24,7 +26,7 @@ trait PipeToSupport {
}
/**
* Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]:
* Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]:
*
* {{{
* import akka.pattern.pipe

View file

@ -15,7 +15,7 @@ import akka.actor._
*
* <ul>
* <li><b>ask:</b> create a temporary one-off actor for receiving a reply to a
* message and complete a [[akka.dispatch.Future]] with it; returns said
* message and complete a [[scala.concurrent.Future]] with it; returns said
* Future.</li>
* <li><b>pipeTo:</b> feed eventually computed value of a future to an actor as
* a message.</li>

View file

@ -16,10 +16,8 @@ import scala.collection.JavaConversions.iterableAsScalaIterable
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.util.Unsafe
import akka.dispatch.Dispatchers
import scala.annotation.tailrec
import scala.runtime.ScalaRunTime
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -1078,7 +1076,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
case (sender, message)
val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider
val asker = akka.pattern.PromiseActorRef(provider, within)
asker.result.pipeTo(sender)
asker.result.future.pipeTo(sender)
toAll(asker, routeeProvider.routees)
}
}

View file

@ -350,10 +350,8 @@ object CompactByteString {
/**
* Creates a new CompactByteString by copying a byte array.
*/
def apply(bytes: Array[Byte]): CompactByteString = {
if (bytes.isEmpty) empty
else ByteString.ByteString1C(bytes.clone)
}
def apply(bytes: Array[Byte]): CompactByteString =
if (bytes.isEmpty) empty else ByteString.ByteString1C(bytes.clone)
/**
* Creates a new CompactByteString by copying bytes.
@ -395,10 +393,8 @@ object CompactByteString {
/**
* Creates a new CompactByteString by encoding a String with a charset.
*/
def apply(string: String, charset: String): CompactByteString = {
if (string.isEmpty) empty
else ByteString.ByteString1C(string.getBytes(charset))
}
def apply(string: String, charset: String): CompactByteString =
if (string.isEmpty) empty else ByteString.ByteString1C(string.getBytes(charset))
/**
* Creates a new CompactByteString by copying length bytes starting at offset from
@ -452,7 +448,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
this
}
protected def fillByteBuffer(len: Int, byteOrder: ByteOrder)(fill: ByteBuffer Unit): this.type = {
@inline protected final def fillByteBuffer(len: Int, byteOrder: ByteOrder)(fill: ByteBuffer Unit): this.type = {
fillArray(len) {
case (array, start)
val buffer = ByteBuffer.wrap(array, start, len)

View file

@ -6,7 +6,7 @@ package akka.camel
import akka.camel.internal._
import akka.util.Timeout
import akka.dispatch.Future
import scala.concurrent.Future
import java.util.concurrent.TimeoutException
import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._

View file

@ -8,7 +8,7 @@ package docs.circuitbreaker
import scala.concurrent.util.duration._ // small d is important here
import akka.pattern.CircuitBreaker
import akka.actor.Actor
import akka.dispatch.Future
import scala.concurrent.Future
import akka.event.Logging
//#imports1

View file

@ -6,13 +6,13 @@ package docs.circuitbreaker;
//#imports1
import akka.actor.UntypedActor;
import akka.dispatch.Future;
import scala.concurrent.Future;
import akka.event.LoggingAdapter;
import scala.concurrent.util.Duration;
import akka.pattern.CircuitBreaker;
import akka.event.Logging;
import static akka.dispatch.Futures.future;
import static scala.concurrent.Futures.future;
import java.util.concurrent.Callable;

View file

@ -10,8 +10,8 @@ import akka.actor.Props;
//#imports
//#import-future
import akka.dispatch.Future;
import akka.dispatch.Futures;
import scala.concurrent.Future;
import scala.concurrent.Futures;
import akka.dispatch.Mapper;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
@ -33,7 +33,7 @@ import akka.actor.Terminated;
//#import-gracefulStop
import static akka.pattern.Patterns.gracefulStop;
import akka.dispatch.Future;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import akka.pattern.AskTimeoutException;
@ -42,8 +42,8 @@ import akka.pattern.AskTimeoutException;
//#import-askPipe
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import akka.dispatch.Future;
import akka.dispatch.Futures;
import scala.concurrent.Future;
import scala.concurrent.Futures;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;

View file

@ -14,29 +14,29 @@ import akka.util.Timeout;
import scala.concurrent.util.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
import static scala.concurrent.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;
//#imports2
//#imports3
import static akka.dispatch.Futures.sequence;
import static scala.concurrent.Futures.sequence;
//#imports3
//#imports4
import static akka.dispatch.Futures.traverse;
import static scala.concurrent.Futures.traverse;
//#imports4
//#imports5
import akka.japi.Function2;
import static akka.dispatch.Futures.fold;
import static scala.concurrent.Futures.fold;
//#imports5
//#imports6
import static akka.dispatch.Futures.reduce;
import static scala.concurrent.Futures.reduce;
//#imports6

View file

@ -16,7 +16,7 @@ import akka.routing.*;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import scala.concurrent.Await;
import akka.dispatch.Future;
import scala.concurrent.Future;
import akka.dispatch.Dispatchers;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;

View file

@ -13,7 +13,7 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import scala.concurrent.util.Duration;
import akka.util.Timeout;
import akka.dispatch.Future;
import scala.concurrent.Future;
import scala.concurrent.Await;
//#parentActor

View file

@ -97,7 +97,7 @@ Method dispatch semantics
Methods returning:
* ``void`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell``
* ``akka.dispatch.Future<?>`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
* ``scala.concurrent.Future<?>`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
* ``scala.Option<?>`` or ``akka.japi.Option<?>`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise.
Any exception that was thrown during this call will be rethrown.

View file

@ -12,7 +12,7 @@ import akka.event.Logging
//#imports1
import akka.dispatch.Future
import scala.concurrent.Future
import akka.actor.{ ActorRef, ActorSystem }
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers

View file

@ -79,7 +79,7 @@ class FutureDocSpec extends AkkaSpec {
val msg = "hello"
implicit val timeout = Timeout(5 seconds)
//#map-to
import akka.dispatch.Future
import scala.concurrent.Future
import akka.pattern.ask
val future: Future[String] = ask(actor, msg).mapTo[String]
@ -90,7 +90,7 @@ class FutureDocSpec extends AkkaSpec {
"demonstrate usage of simple future eval" in {
//#future-eval
import scala.concurrent.Await
import akka.dispatch.Future
import scala.concurrent.Future
import scala.concurrent.util.duration._
val future = Future {

View file

@ -9,7 +9,7 @@ import language.postfixOps
import akka.testkit.TestProbe
import scala.concurrent.util.duration._
import akka.actor._
import akka.dispatch.Futures
import scala.concurrent.Futures
//#imports-test-probe

View file

@ -97,7 +97,7 @@ Method dispatch semantics
Methods returning:
* ``Unit`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell``
* ``akka.dispatch.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
* ``scala.concurrent.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
* ``scala.Option[_]`` or ``akka.japi.Option<?>`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise.
Any exception that was thrown during this call will be rethrown.

View file

@ -17,7 +17,7 @@ import akka.event.{ LoggingAdapter, Logging }
import scala.util.control.NoStackTrace
import akka.event.LoggingReceive
import java.net.InetSocketAddress
import akka.dispatch.Future
import scala.concurrent.Future
import akka.actor.{ OneForOneStrategy, SupervisorStrategy, Status, Address, PoisonPill }
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit.MILLISECONDS

View file

@ -16,7 +16,7 @@ import com.typesafe.config.ConfigFactory
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeoutException
import akka.pattern.{ ask, pipe, AskTimeoutException }
import akka.dispatch.Future
import scala.concurrent.Future
import scala.util.control.NoStackTrace
import akka.event.{ LoggingAdapter, Logging }
import java.net.{ InetSocketAddress, ConnectException }

View file

@ -10,7 +10,7 @@ import akka.remote.netty.NettyRemoteTransport
import akka.actor.Actor
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.dispatch.Future
import scala.concurrent.Future
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import java.util.concurrent.atomic.AtomicBoolean

View file

@ -6,7 +6,7 @@ package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.dispatch.Future
import scala.concurrent.Future
import scala.concurrent.Await
import akka.pattern.ask

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.dispatch.Future
import scala.concurrent.Future
import akka.pattern.ask
import java.io.File
import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController }

View file

@ -9,8 +9,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.Logging.Warning
import akka.dispatch.{ Future, Promise }
import scala.concurrent.Await
import scala.concurrent.{ Future, Promise, Await }
import scala.concurrent.util.duration._
import akka.actor.ActorSystem
import akka.pattern.ask

View file

@ -6,9 +6,8 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import scala.concurrent.Await
import scala.concurrent.{ Future, Await }
import scala.concurrent.util.duration._
import akka.dispatch.Future
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -17,7 +17,7 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import scala.concurrent.Await;
import akka.dispatch.Future;
import scala.concurrent.Future;
import static akka.pattern.Patterns.ask;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;

View file

@ -17,7 +17,7 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import scala.concurrent.Await;
import akka.dispatch.Future;
import scala.concurrent.Future;
import static akka.pattern.Patterns.ask;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;