diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index c092ccceb2..4053a2d7f2 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -93,7 +93,7 @@ public class JavaFutureTests { Throwable exception = new NullPointerException(); cf.failure(exception); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - assertEquals(f.value().get().left().get(), exception); + assertEquals(f.value().get().failed().get(), exception); } @Test diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index acd5828dc9..72efaef0d4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -15,6 +15,8 @@ import akka.testkit._ import akka.dispatch.MessageDispatcher import akka.pattern.ask import java.net.{ Socket, InetSocketAddress, InetAddress, SocketAddress } +import scala.util.Failure +import annotation.tailrec object IOActorSpec { @@ -256,7 +258,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { def run(n: Int) { future onComplete { - case Left(e) if check(n, e) ⇒ + case Failure(e) if check(n, e) ⇒ if (delay.isDefined) { executor match { case m: MessageDispatcher ⇒ m.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 426f940b5d..f263a2f188 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.util.Timeout import scala.concurrent.Future +import scala.util.Success object LocalActorRefProviderSpec { val config = """ @@ -53,9 +54,9 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi implicit val timeout = Timeout(5 seconds) val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(c ⇒ { case _ ⇒ }), address)) val set = Set() ++ actors.map(a ⇒ Await.ready(a, timeout.duration).value match { - case Some(Right(a: ActorRef)) ⇒ 1 - case Some(Left(ex: InvalidActorNameException)) ⇒ 2 - case x ⇒ x + case Some(Success(a: ActorRef)) ⇒ 1 + case Some(Success(ex: InvalidActorNameException)) ⇒ 2 + case x ⇒ x }) set must be === Set(1, 2) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 96c9f6a0d9..a151330b80 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -21,6 +21,7 @@ import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } import java.util.concurrent._ import scala.reflect.ClassTag +import scala.util.{ Failure, Success, Try } object FutureSpec { @@ -75,22 +76,22 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "completed with a result" must { val result = "test value" - val future = Promise[String]().complete(Right(result)).future + val future = Promise[String]().complete(Success(result)).future behave like futureWithResult(_(future, result)) } "completed with an exception" must { val message = "Expected Exception" - val future = Promise[String]().complete(Left(new RuntimeException(message))).future + val future = Promise[String]().complete(Failure(new RuntimeException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with an InterruptedException" must { val message = "Boxed InterruptedException" - val future = Promise[String]().complete(Left(new InterruptedException(message))).future + val future = Promise[String]().complete(Failure(new InterruptedException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with a NonLocalReturnControl" must { val result = "test value" - val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future + val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future behave like futureWithResult(_(future, result)) } @@ -155,13 +156,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "pass checks" in { filterException[ArithmeticException] { check({ (future: Future[Int], actions: List[FutureAction]) ⇒ - def wrap[T](f: Future[T]): Either[Throwable, T] = FutureSpec.ready(f, timeout.duration).value.get + def wrap[T](f: Future[T]): Try[T] = FutureSpec.ready(f, timeout.duration).value.get val result = (future /: actions)(_ /: _) val expected = (wrap(future) /: actions)(_ /: _) ((wrap(result), expected) match { - case (Right(a), Right(b)) ⇒ a == b - case (Left(a), Left(b)) if a.toString == b.toString ⇒ true - case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ a.getClass.toString == b.getClass.toString + case (Success(a), Success(b)) ⇒ a == b + case (Failure(a), Failure(b)) if a.toString == b.toString ⇒ true + case (Failure(a), Failure(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ a.getClass.toString == b.getClass.toString case _ ⇒ false }) :| result.value.get.toString + " is expected to be " + expected.toString }, minSuccessful(10000), workers(4)) @@ -360,7 +361,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "andThen like a boss" in { val q = new LinkedBlockingQueue[Int] for (i ← 1 to 1000) { - Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Right(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) must be(3) + Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Success(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) must be(3) q.poll() must be(1) q.poll() must be(2) q.poll() must be(3) @@ -821,7 +822,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa p1 must not be ('completed) f4 must not be ('completed) - p1 complete Right("Hello") + p1 complete Success("Hello") FutureSpec.ready(latch(7), TestLatch.DefaultTimeout) @@ -889,7 +890,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future must be('completed)) } - "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } + "contain a value" in { f((future, result) ⇒ future.value must be(Some(Success(result)))) } "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } "return result with 'Await.result'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } "not timeout" in { f((future, _) ⇒ FutureSpec.ready(future, 0 millis)) } @@ -938,7 +939,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f((future, message) ⇒ { future.value must be('defined) future.value.get must be('left) - future.value.get.left.get.getMessage must be(message) + future.value.get match { + case Failure(f) ⇒ f.getMessage must be(message) + } }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } @@ -976,23 +979,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case class IntDiv(n: Int) extends IntAction { def apply(that: Int) = that / n } sealed trait FutureAction { - def /:(that: Either[Throwable, Int]): Either[Throwable, Int] + def /:(that: Try[Int]): Try[Int] def /:(that: Future[Int]): Future[Int] } case class MapAction(action: IntAction) extends FutureAction { - def /:(that: Either[Throwable, Int]): Either[Throwable, Int] = that match { - case Left(e) ⇒ that - case Right(r) ⇒ try { Right(action(r)) } catch { case e if NonFatal(e) ⇒ Left(e) } - } + def /:(that: Try[Int]): Try[Int] = that map action.apply def /:(that: Future[Int]): Future[Int] = that map action.apply } case class FlatMapAction(action: IntAction) extends FutureAction { - def /:(that: Either[Throwable, Int]): Either[Throwable, Int] = that match { - case Left(e) ⇒ that - case Right(r) ⇒ try { Right(action(r)) } catch { case e if NonFatal(e) ⇒ Left(e) } - } + def /:(that: Try[Int]): Try[Int] = that map action.apply def /:(that: Future[Int]): Future[Int] = that flatMap (n ⇒ Future.successful(action(n))) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index cbb098defc..31a314da2e 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.Await import akka.testkit.DefaultTimeout import akka.actor.{ Props, ActorRef } import akka.util.Timeout +import scala.util.Failure class AskSpec extends AkkaSpec { @@ -22,8 +23,8 @@ class AskSpec extends AkkaSpec { val f = dead.ask(42)(1 second) f.isCompleted must be(true) f.value.get match { - case Left(_: AskTimeoutException) ⇒ - case v ⇒ fail(v + " was not Left(AskTimeoutException)") + case Failure(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") } } @@ -33,8 +34,8 @@ class AskSpec extends AkkaSpec { val f = empty ? 3.14 f.isCompleted must be(true) f.value.get match { - case Left(_: AskTimeoutException) ⇒ - case v ⇒ fail(v + " was not Left(AskTimeoutException)") + case Failure(_: AskTimeoutException) ⇒ + case v ⇒ fail(v + " was not Left(AskTimeoutException)") } } diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 4cd5a876d5..90cf83a3a9 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -254,7 +254,7 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) val b = system.actorOf(Props(new FooActor)) system stop b - val c = system.actorOf(Props().withCreator(new UntypedActorFactory { + val c = system.actorOf(Props.empty.withCreator(new UntypedActorFactory { def create() = new FooUntypedActor })) system stop c diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index 981d407472..a5f65b6a7e 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -11,7 +11,7 @@ import org.scalacheck.Gen._ import scala.collection.mutable.Builder -import java.nio.{ ByteBuffer, ShortBuffer, IntBuffer, FloatBuffer, DoubleBuffer } +import java.nio.{ ByteBuffer } import java.nio.ByteOrder, ByteOrder.{ BIG_ENDIAN, LITTLE_ENDIAN } import java.lang.Float.floatToRawIntBits import java.lang.Double.doubleToRawLongBits diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index e71782869d..fa85a19ac2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -8,6 +8,7 @@ import akka.dispatch._ import akka.routing._ import akka.event._ import akka.util.{ Switch, Helpers } +import scala.util.{ Success, Failure } import scala.util.control.NonFatal import scala.concurrent.{ Future, Promise } import java.util.concurrent.atomic.AtomicLong @@ -354,8 +355,7 @@ class LocalActorRefProvider( def provider: ActorRefProvider = LocalActorRefProvider.this - override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.toLeft(())) } - + override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.map(Failure(_)).getOrElse(Success(()))) } override def isTerminated: Boolean = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index eed2f56efd..aaa5432815 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import scala.reflect.ClassTag import akka.serialization.{ JavaSerializer, SerializationExtension } import java.io.ObjectStreamException +import scala.util.{ Try, Success, Failure } /** * A TypedActorFactory is something that can created TypedActor instances. @@ -302,8 +303,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi if (m.returnsFuture_?) { val s = sender m(me).asInstanceOf[Future[Any]] onComplete { - case Left(f) ⇒ s ! Status.Failure(f) - case Right(r) ⇒ s ! r + case Failure(f) ⇒ s ! Status.Failure(f) + case Success(r) ⇒ s ! r } } else { sender ! m(me) @@ -408,9 +409,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case m if m.returnsJOption_? || m.returnsOption_? ⇒ val f = ask(actor, m)(timeout) (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match { - case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None - case Some(Right(joption)) ⇒ joption.asInstanceOf[AnyRef] - case Some(Left(ex)) ⇒ throw ex + case None | Some(Success(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None + case Some(t: Try[_]) ⇒ t.get.asInstanceOf[AnyRef] } case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef] } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0ab004fc59..113215cd23 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -10,6 +10,7 @@ import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExe import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import java.util.concurrent.{ Executor, ExecutorService, ExecutionException, Callable, TimeoutException } +import scala.util.{ Try, Success, Failure } /** * ExecutionContexts is the Java API for ExecutionContexts @@ -227,10 +228,10 @@ abstract class OnFailure extends japi.CallbackBridge[Throwable] { * * Java API */ -abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] { - protected final override def internal(value: Either[Throwable, T]): Unit = value match { - case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) - case Right(r) ⇒ onComplete(null, r) +abstract class OnComplete[-T] extends japi.CallbackBridge[Try[T]] { + protected final override def internal(value: Try[T]): Unit = value match { + case Failure(t) ⇒ onComplete(t, null.asInstanceOf[T]) + case Success(r) ⇒ onComplete(null, r) } /** diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 42bc9fcb9b..ba9558aa84 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -12,6 +12,7 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import scala.concurrent.{ Future, Promise, ExecutionContext } import akka.util.{ Timeout, Unsafe } +import scala.util.{ Success, Failure } /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -249,13 +250,12 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match { case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message - case _ ⇒ if (!(result.tryComplete { + case _ ⇒ if (!(result.tryComplete( message match { - case Status.Success(r) ⇒ Right(r) - case Status.Failure(f) ⇒ Left(f) - case other ⇒ Right(other) - } - })) provider.deadLetters ! message + case Status.Success(r) ⇒ Success(r) + case Status.Failure(f) ⇒ Failure(f) + case other ⇒ Success(other) + }))) provider.deadLetters ! message } override def sendSystemMessage(message: SystemMessage): Unit = message match { @@ -278,7 +278,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @tailrec override def stop(): Unit = { def ensureCompleted(): Unit = { - result.tryComplete(Left(new ActorKilledException("Stopped"))) + result tryComplete Failure(new ActorKilledException("Stopped")) val watchers = clearWatchers() if (!watchers.isEmpty) { val termination = Terminated(this)(existenceConfirmed = true, 0) @@ -308,7 +308,7 @@ private[akka] object PromiseActorRef { implicit val ec = provider.dispatcher // TODO should we take an ExecutionContext in the method signature? val result = Promise[Any]() val a = new PromiseActorRef(provider, result) - val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } + val f = provider.scheduler.scheduleOnce(timeout.duration) { result tryComplete Failure(new AskTimeoutException("Timed out")) } result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 64c2e7c21a..a9cb8bc0c5 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -13,6 +13,7 @@ import scala.concurrent.{ ExecutionContext, Future, Promise, Await } import scala.concurrent.util.{ Duration, Deadline } import scala.concurrent.util.duration._ import scala.util.control.NonFatal +import scala.util.Success /** * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread @@ -306,8 +307,8 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati val deadline = callTimeout.fromNow val bodyFuture = try body catch { case NonFatal(t) ⇒ Future.failed(t) } bodyFuture.onComplete({ - case Right(_) if !deadline.isOverdue() ⇒ callSucceeds() - case _ ⇒ callFails() + case s: Success[_] if !deadline.isOverdue() ⇒ callSucceeds() + case _ ⇒ callFails() })(CircuitBreaker.syncExecutionContext) bodyFuture } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 4a11da219a..3db168e4c4 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -9,6 +9,7 @@ import akka.util.{ Timeout } import akka.dispatch.{ Unwatch, Watch } import scala.concurrent.Future import scala.concurrent.util.Duration +import scala.util.Success trait GracefulStopSupport { /** @@ -45,8 +46,8 @@ trait GracefulStopSupport { internalTarget.sendSystemMessage(Watch(target, ref)) val f = ref.result.future f onComplete { // Just making sure we're not leaking here - case Right(Terminated(`target`)) ⇒ () - case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) + case Success(Terminated(`target`)) ⇒ () + case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) } target ! PoisonPill f map { diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index 8eb3859f12..5563a908de 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -6,6 +6,7 @@ package akka.pattern import language.implicitConversions import scala.concurrent.{ Future, ExecutionContext } +import scala.util.{ Failure, Success } import akka.actor.{ Status, ActorRef } trait PipeToSupport { @@ -13,8 +14,8 @@ trait PipeToSupport { final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) { def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = null): Future[T] = { future onComplete { - case Right(r) ⇒ recipient ! r - case Left(f) ⇒ recipient ! Status.Failure(f) + case Success(r) ⇒ recipient ! r + case Failure(f) ⇒ recipient ! Status.Failure(f) } future } diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 4dff298ecb..159ec437c5 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -24,6 +24,7 @@ import akka.util.Timeout import akka.camel.internal.CamelExchangeAdapter import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage } import support.TypeConverterSupport +import scala.util.{ Failure, Success, Try } /** * For internal use only. @@ -155,18 +156,18 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex callback.done(true) true // done sync } else { - val action: PartialFunction[Either[Throwable, Any], Unit] = + val action: PartialFunction[Try[Any], Unit] = if (exchange.isOutCapable) { - case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + case Success(failure: FailureResult) ⇒ exchange.setFailure(failure) + case Success(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) + case Failure(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Failure(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } else { - case Right(Ack) ⇒ () /* no response message to set */ - case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + case Success(Ack) ⇒ () /* no response message to set */ + case Success(failure: FailureResult) ⇒ exchange.setFailure(failure) + case Success(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) + case Failure(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Failure(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) } val async = try actorFor(endpoint.path).ask(messageFor(exchange))(Timeout(endpoint.replyTimeout)) catch { case NonFatal(e) ⇒ Future.failed(e) } implicit val ec = camel.system.dispatcher // FIXME which ExecutionContext should be used here? diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index e9d5382843..6510f6fd67 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -19,6 +19,7 @@ import scala.concurrent.util.duration._ import akka.util.Timeout import org.scalatest.matchers.MustMatchers import akka.testkit._ +import scala.util.Success /** * Tests the features of the Camel Producer. diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala index 11178277b9..ba853083bf 100644 --- a/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerTest.scala @@ -53,8 +53,8 @@ class UntypedProducerTest extends WordSpec with MustMatchers with BeforeAndAfter filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { val future = producer.ask(message)(timeout).failed - Await.ready(future, timeout).value match { - case Some(Right(e: AkkaCamelException)) ⇒ + Await.result(future, timeout) match { + case e: AkkaCamelException ⇒ e.getMessage must be("failure") e.headers must be(Map(CamelMessage.MessageExchangeId -> "123")) case unexpected ⇒ fail("Actor responded with unexpected message:" + unexpected) diff --git a/akka-dataflow/src/main/scala/akka/dataflow/package.scala b/akka-dataflow/src/main/scala/akka/dataflow/package.scala index bdf1d79aa2..9f4e6a0da2 100644 --- a/akka-dataflow/src/main/scala/akka/dataflow/package.scala +++ b/akka-dataflow/src/main/scala/akka/dataflow/package.scala @@ -9,6 +9,7 @@ import language.implicitConversions import scala.util.continuations._ import scala.concurrent.{ Promise, Future, ExecutionContext } import scala.util.control.NonFatal +import scala.util.Failure package object dataflow { /** @@ -33,10 +34,10 @@ package object dataflow { new Runnable { def run = try { (reify(body) foreachFull (r ⇒ p.success(r).future, f ⇒ p.failure(f).future): Future[Any]) onFailure { - case NonFatal(e) ⇒ p tryComplete Left(e) + case NonFatal(e) ⇒ p tryComplete Failure(e) } } catch { - case NonFatal(e) ⇒ p tryComplete Left(e) + case NonFatal(e) ⇒ p tryComplete Failure(e) } }) p.future diff --git a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala index c5d543f84d..18f2d5068d 100644 --- a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala +++ b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala @@ -121,7 +121,7 @@ class DataflowSpec extends AkkaSpec with DefaultTimeout { val a, b, c = Promise[Int]() val result2 = flow { - val n = (a << c).value.get.right.get + 10 + val n = (a << c).value.get.get + 10 b << (c() - 2) a() + n * b() } diff --git a/akka-docs/scala/code/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/docs/future/FutureDocSpec.scala index dc28fd7185..90da94bae5 100644 --- a/akka-docs/scala/code/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/docs/future/FutureDocSpec.scala @@ -7,18 +7,19 @@ import language.postfixOps import akka.testkit._ import akka.actor.{ Actor, Props } -import akka.actor.Status.Failure +import akka.actor.Status import akka.util.Timeout import scala.concurrent.util.duration._ import java.lang.IllegalStateException import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.util.{ Failure, Success } object FutureDocSpec { class MyActor extends Actor { def receive = { case x: String ⇒ sender ! x.toUpperCase - case x: Int if x < 0 ⇒ sender ! Failure(new ArithmeticException("Negative values not supported")) + case x: Int if x < 0 ⇒ sender ! Status.Failure(new ArithmeticException("Negative values not supported")) case x: Int ⇒ sender ! x } } @@ -312,7 +313,7 @@ class FutureDocSpec extends AkkaSpec { def watchSomeTV = () //#and-then val result = Future { loadPage(url) } andThen { - case Left(exception) ⇒ log(exception) + case Failure(exception) ⇒ log(exception) } andThen { case _ ⇒ watchSomeTV } @@ -358,8 +359,8 @@ class FutureDocSpec extends AkkaSpec { def doSomethingOnFailure(t: Throwable) = () //#onComplete future onComplete { - case Right(result) ⇒ doSomethingOnSuccess(result) - case Left(failure) ⇒ doSomethingOnFailure(failure) + case Success(result) ⇒ doSomethingOnSuccess(result) + case Failure(failure) ⇒ doSomethingOnFailure(failure) } //#onComplete Await.result(future, 1 second) must be("foo") diff --git a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala index 94ddceab2f..830830dffa 100644 --- a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala @@ -4,6 +4,7 @@ package docs.testkit import language.postfixOps +import scala.util.Success //#imports-test-probe import akka.testkit.TestProbe @@ -129,7 +130,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { // hypothetical message stimulating a '42' answer val future = actorRef ? Say42 val result = future.value.get match { - case Right(x: Int) ⇒ x + case Success(x: Int) ⇒ x } result must be(42) //#test-behavior diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 4e3047b920..2d011e1d51 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -24,7 +24,11 @@ object AkkaBuild extends Build { lazy val buildSettings = Seq( organization := "com.typesafe.akka", version := "2.1-SNAPSHOT", - scalaVersion := "2.10.0-M6" + scalaVersion := "2.10.0-SNAPSHOT", + scalaVersion in update <<= (scalaVersion) apply { + case "2.10.0-SNAPSHOT" => "2.10.0-M6" + case x => x + } ) lazy val akka = Project( @@ -347,6 +351,7 @@ object AkkaBuild extends Build { // Settings override lazy val settings = super.settings ++ buildSettings ++ Seq( + resolvers += "Scala Community 2.10.0-SNAPSHOT" at "https://scala-webapps.epfl.ch/jenkins/job/community-nightly/ws/target/repositories/8e83577d99af1d718fe369c4a4ee92737b9cf669", resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/", shellPrompt := { s => Project.extract(s).currentProject.id + " > " } @@ -580,7 +585,10 @@ object Dependency { val config = "com.typesafe" % "config" % "0.5.0" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD - val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) + // TODO FIXME use live version + // val scalaStm = "org.scala-tools" %% "scala-stm" % "0.6" // Modified BSD (Scala) + val scalaStm = "org.scala-tools" % "scala-stm" % "0.7-SNAPSHOT" // Modified BSD (Scala) + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 diff --git a/project/plugins.sbt b/project/plugins.sbt index b6ffd705bf..2f4e1d369b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ resolvers += Classpaths.typesafeResolver -addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M4") +addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M4" cross CrossVersion.full) addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0")