From aa4ad6f3c3ec717026cabaf28dbdcafa1bc536e0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 22 Jul 2012 21:40:09 +0200 Subject: [PATCH] Making it green --- .../src/main/scala/akka/dispatch/Future.scala | 58 ++++++++++++++++- .../src/test/scala/akka/agent/AgentSpec.scala | 1 + .../main/scala/akka/camel/Activation.scala | 8 +-- .../internal/component/ActorComponent.scala | 65 +++++++------------ .../component/ActorProducerTest.scala | 1 + .../scala/akka/cluster/LargeClusterSpec.scala | 2 +- .../UnreachableNodeRejoinsClusterSpec.scala | 2 +- .../docs/actor/TypedActorDocTestBase.java | 2 +- .../docs/actor/UntypedActorDocTestBase.java | 2 +- .../actor/japi/FaultHandlingDocSample.java | 2 +- .../code/docs/future/FutureDocTestBase.java | 41 ++++++------ .../scala/code/docs/actor/ActorDocSpec.scala | 2 +- .../docs/actor/FaultHandlingDocSample.scala | 1 + .../scala/code/docs/camel/Consumers.scala | 8 ++- .../scala/code/docs/camel/CustomRoute.scala | 6 ++ .../scala/code/docs/camel/Introduction.scala | 4 +- .../scala/code/docs/camel/Producers.scala | 3 +- .../scala/code/docs/camel/QuartzExample.scala | 36 +++++----- .../code/docs/future/FutureDocSpec.scala | 4 +- project/AkkaBuild.scala | 11 ++-- 20 files changed, 155 insertions(+), 104 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 14cb9c2014..80d57a6dca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -5,11 +5,63 @@ package akka.dispatch import scala.runtime.{ BoxedUnit, AbstractPartialFunction } -import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption } -import scala.concurrent.{ Future, Promise, ExecutionContext } +import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption, Procedure } +import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService } import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } -import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } +import java.util.concurrent.{ Executor, ExecutorService, ExecutionException, Callable, TimeoutException } + +/** + * ExecutionContexts is the Java API for ExecutionContexts + */ +object ExecutionContexts { + /** + * Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor, + * and which will use the default error reporter. + * + * @param executor the Executor which will be used for the ExecutionContext + * @return a new ExecutionContext + */ + def fromExecutor(executor: Executor): ExecutionContextExecutor = + ExecutionContext.fromExecutor(executor) + + /** + * Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor, + * and which will use the provided error reporter. + * + * @param executor the Executor which will be used for the ExecutionContext + * @param errorReporter a Procedure that will log any exceptions passed to it + * @return a new ExecutionContext + */ + def fromExecutor(executor: Executor, errorReporter: Procedure[Throwable]): ExecutionContextExecutor = + ExecutionContext.fromExecutor(executor, errorReporter.apply) + + /** + * Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService, + * and which will use the default error reporter. + * + * @param executor the ExecutorService which will be used for the ExecutionContext + * @return a new ExecutionContext + */ + def fromExecutorService(executorService: ExecutorService): ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(executorService) + + /** + * Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService, + * and which will use the provided error reporter. + * + * @param executor the ExecutorService which will be used for the ExecutionContext + * @param errorReporter a Procedure that will log any exceptions passed to it + * @return a new ExecutionContext + */ + def fromExecutorService(executorService: ExecutorService, errorReporter: Procedure[Throwable]): ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(executorService, errorReporter.apply) + + /** + * @return a reference to the global ExecutionContext + */ + def global(): ExecutionContext = ExecutionContext.global +} /** * Futures is the Java API for Futures and Promises diff --git a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala index 99a3ab6dbc..dd57abe33f 100644 --- a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -54,6 +54,7 @@ class AgentSpec extends AkkaSpec { } "maintain order between alter and alterOff" in { + import system.dispatcher val l1, l2 = new CountDownLatch(1) val agent = Agent("a") diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index e2da7f0b2f..768ea22b3a 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -52,10 +52,10 @@ trait Activation { * @param timeout the timeout for the Future */ def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] = - (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] { + (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ case EndpointActivated(_) ⇒ endpoint case EndpointFailedToActivate(_, cause) ⇒ throw cause - } + })(system.dispatcher) /** * Similar to awaitDeactivation but returns a future instead. @@ -63,10 +63,10 @@ trait Activation { * @param timeout the timeout of the Future */ def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] = - (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] { + (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({ case EndpointDeActivated(_) ⇒ () case EndpointFailedToDeActivate(_, cause) ⇒ throw cause - } + })(system.dispatcher) } /** diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 95d1f6cc2d..309e84b032 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -17,10 +17,11 @@ import akka.pattern._ import scala.reflect.BeanProperty import scala.concurrent.util.duration._ import scala.concurrent.util.Duration +import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NonFatal import java.util.concurrent.{ TimeoutException, CountDownLatch } -import akka.camel.internal.CamelExchangeAdapter import akka.util.Timeout +import akka.camel.internal.CamelExchangeAdapter import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage } /** @@ -153,49 +154,31 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex * @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously */ private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = { - - // these notify methods are just a syntax sugar - def notifyDoneSynchronously[A](a: A = null): Unit = callback.done(true) - def notifyDoneAsynchronously[A](a: A = null): Unit = callback.done(false) - - def message: CamelMessage = messageFor(exchange) - - if (exchange.isOutCapable) { //InOut - sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously) - } else { // inOnly - if (endpoint.autoack) { //autoAck - fireAndForget(message, exchange) - notifyDoneSynchronously() - true // done sync - } else { //manualAck - sendAsync(message, onComplete = forwardAckTo(exchange) andThen notifyDoneAsynchronously) - } + if (!exchange.isOutCapable && endpoint.autoack) { + fireAndForget(messageFor(exchange), exchange) + callback.done(true) + true // done sync + } else { + val action: PartialFunction[Either[Throwable, Any], Unit] = + if (exchange.isOutCapable) { + case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) + case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + } else { + case Right(Ack) ⇒ () /* no response message to set */ + case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) + case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) + case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) + case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) + } + val async = try actorFor(endpoint.path).ask(messageFor(exchange))(Timeout(endpoint.replyTimeout)) catch { case NonFatal(e) ⇒ Future.failed(e) } + implicit val ec = camel.system.dispatcher // FIXME which ExecutionContext should be used here? + async.onComplete(action andThen { _ ⇒ callback.done(false) }) + false } } - private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { - case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setResponse(CamelMessage.canonicalize(msg)) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) - } - - private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = { - case Right(Ack) ⇒ { /* no response message to set */ } - case Right(failure: FailureResult) ⇒ exchange.setFailure(failure) - case Right(msg) ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path)))) - case Left(e: TimeoutException) ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint)))) - case Left(throwable) ⇒ exchange.setFailure(FailureResult(throwable)) - } - - private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = { - try { - actorFor(endpoint.path).ask(message)(Timeout(endpoint.replyTimeout)).onComplete(onComplete) - } catch { - case NonFatal(e) ⇒ onComplete(Left(e)) - } - false // Done async - } private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter): Unit = try { actorFor(endpoint.path) ! message } catch { case NonFatal(e) ⇒ exchange.setFailure(new FailureResult(e)) } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 05576513cb..1a43d041c7 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -330,6 +330,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } def prepareMocks(actor: ActorRef, message: CamelMessage = message, outCapable: Boolean) { + when(camel.system) thenReturn system when(actorEndpointPath.findActorIn(any[ActorSystem])) thenReturn Option(actor) when(exchange.toRequestMessage(any[Map[String, Any]])) thenReturn message when(exchange.isOutCapable) thenReturn outCapable diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 08f7ca10fa..da612b57bf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -7,7 +7,7 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ +import scala.concurrent.util.duration._ import akka.actor.ActorSystem import akka.util.Deadline import java.util.concurrent.TimeoutException diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 65a36080ff..1244727d3f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -10,7 +10,7 @@ import akka.testkit._ import com.typesafe.config.ConfigFactory import akka.actor.Address import akka.remote.testconductor.{ RoleName, Direction } -import akka.util.duration._ +import scala.concurrent.util.duration._ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { val first = role("first") diff --git a/akka-docs/java/code/docs/actor/TypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/TypedActorDocTestBase.java index 10dcb6f35f..ab2acd4db2 100644 --- a/akka-docs/java/code/docs/actor/TypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/TypedActorDocTestBase.java @@ -56,7 +56,7 @@ public class TypedActorDocTestBase { } public Future square(int i) { - return Futures.successful(i * i, TypedActor.dispatcher()); + return Futures.successful(i * i); } public Option squareNowPlease(int i) { diff --git a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java index 7ae1cf7e21..a64fa48615 100644 --- a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java @@ -251,7 +251,7 @@ public class UntypedActorDocTestBase { } }, system.dispatcher()); - pipe(transformed).to(actorC); + pipe(transformed, system.dispatcher()).to(actorC); //#ask-pipe system.shutdown(); } diff --git a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java index 4efaccb553..b7338830e4 100644 --- a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -151,7 +151,7 @@ public class FaultHandlingDocSample { public Progress apply(CurrentCount c) { return new Progress(100.0 * c.count / totalCount); } - }, getContext().dispatcher())) + }, getContext().dispatcher()), getContext().dispatcher()) .to(progressListener); } else { unhandled(msg); diff --git a/akka-docs/java/code/docs/future/FutureDocTestBase.java b/akka-docs/java/code/docs/future/FutureDocTestBase.java index 7e6e5b9fa8..e6c482a66d 100644 --- a/akka-docs/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/docs/future/FutureDocTestBase.java @@ -86,10 +86,10 @@ public class FutureDocTestBase { ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor(); //#diy-execution-context ExecutionContext ec = - ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere, (scala.Function1)(ExecutionContext$.MODULE$.fromExecutorService$default$2())); + ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere); //Use ec with your Futures - Future f1 = Futures.successful("foo", ec); + Future f1 = Futures.successful("foo"); // Then you shut the ExecutorService down somewhere at the end of your program/application. yourExecutorServiceGoesHere.shutdown(); @@ -219,8 +219,8 @@ public class FutureDocTestBase { @Test public void useSequence() throws Exception { List> source = new ArrayList>(); - source.add(Futures.successful(1, system.dispatcher())); - source.add(Futures.successful(2, system.dispatcher())); + source.add(Futures.successful(1)); + source.add(Futures.successful(2)); //#sequence final ExecutionContext ec = system.dispatcher(); @@ -271,8 +271,8 @@ public class FutureDocTestBase { @Test public void useFold() throws Exception { List> source = new ArrayList>(); - source.add(Futures.successful("a", system.dispatcher())); - source.add(Futures.successful("b", system.dispatcher())); + source.add(Futures.successful("a")); + source.add(Futures.successful("b")); //#fold final ExecutionContext ec = system.dispatcher(); @@ -295,8 +295,8 @@ public class FutureDocTestBase { @Test public void useReduce() throws Exception { List> source = new ArrayList>(); - source.add(Futures.successful("a", system.dispatcher())); - source.add(Futures.successful("b", system.dispatcher())); + source.add(Futures.successful("a")); + source.add(Futures.successful("b")); //#reduce final ExecutionContext ec = system.dispatcher(); @@ -319,10 +319,10 @@ public class FutureDocTestBase { public void useSuccessfulAndFailed() throws Exception { final ExecutionContext ec = system.dispatcher(); //#successful - Future future = Futures.successful("Yay!", ec); + Future future = Futures.successful("Yay!"); //#successful //#failed - Future otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), ec); + Future otherFuture = Futures.failed(new IllegalArgumentException("Bang!")); //#failed Object result = Await.result(future, Duration.create(1, SECONDS)); assertEquals("Yay!", result); @@ -334,7 +334,7 @@ public class FutureDocTestBase { public void useFilter() throws Exception { //#filter final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.successful(4, ec); + Future future1 = Futures.successful(4); Future successfulFilter = future1.filter(Filter.filterOf(new Function() { public Boolean apply(Integer i) { return i % 2 == 0; @@ -362,7 +362,7 @@ public class FutureDocTestBase { public void useAndThen() { //#and-then final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.successful("value", ec).andThen(new OnComplete() { + Future future1 = Futures.successful("value").andThen(new OnComplete() { public void onComplete(Throwable failure, String result) { if (failure != null) sendToIssueTracker(failure); @@ -427,7 +427,7 @@ public class FutureDocTestBase { @Test public void useOnSuccessOnFailureAndOnComplete() throws Exception { { - Future future = Futures.successful("foo", system.dispatcher()); + Future future = Futures.successful("foo"); //#onSuccess final ExecutionContext ec = system.dispatcher(); @@ -444,7 +444,7 @@ public class FutureDocTestBase { //#onSuccess } { - Future future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); + Future future = Futures.failed(new IllegalStateException("OHNOES")); //#onFailure final ExecutionContext ec = system.dispatcher(); @@ -460,7 +460,7 @@ public class FutureDocTestBase { //#onFailure } { - Future future = Futures.successful("foo", system.dispatcher()); + Future future = Futures.successful("foo"); //#onComplete final ExecutionContext ec = system.dispatcher(); @@ -482,8 +482,8 @@ public class FutureDocTestBase { { //#zip final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.successful("foo", ec); - Future future2 = Futures.successful("bar", ec); + Future future1 = Futures.successful("foo"); + Future future2 = Futures.successful("bar"); Future future3 = future1.zip(future2).map(new Mapper, String>() { public String apply(scala.Tuple2 zipped) { return zipped._1() + " " + zipped._2(); @@ -497,10 +497,9 @@ public class FutureDocTestBase { { //#fallback-to - final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.failed(new IllegalStateException("OHNOES1"), ec); - Future future2 = Futures.failed(new IllegalStateException("OHNOES2"), ec); - Future future3 = Futures.successful("bar", ec); + Future future1 = Futures.failed(new IllegalStateException("OHNOES1")); + Future future2 = Futures.failed(new IllegalStateException("OHNOES2")); + Future future3 = Futures.successful("bar"); Future future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case String result = Await.result(future4, Duration.create(1, SECONDS)); assertEquals("bar", result); diff --git a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala index e77de916ff..a5021bf525 100644 --- a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala @@ -363,7 +363,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) //#ask-pipeTo import akka.pattern.{ ask, pipe } - + import system.dispatcher // The ExecutionContext that will be used case class Result(x: Int, s: String, d: Double) case object Request diff --git a/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala b/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala index 12af8f0fce..1e4dc4f6ab 100644 --- a/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/scala/code/docs/actor/FaultHandlingDocSample.scala @@ -103,6 +103,7 @@ class Worker extends Actor with ActorLogging { counterService ! Increment(1) // Send current progress to the initial sender + import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext counterService ? GetCurrentCount map { case CurrentCount(_, count) ⇒ Progress(100.0 * count / totalCount) } pipeTo progressListener.get diff --git a/akka-docs/scala/code/docs/camel/Consumers.scala b/akka-docs/scala/code/docs/camel/Consumers.scala index 6bb0564a2a..178c340d63 100644 --- a/akka-docs/scala/code/docs/camel/Consumers.scala +++ b/akka-docs/scala/code/docs/camel/Consumers.scala @@ -1,5 +1,11 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + package docs.camel +import language.postfixOps + object Consumers { def foo = { //#Consumer1 @@ -53,7 +59,7 @@ object Consumers { { //#Consumer4 import akka.camel.{ CamelMessage, Consumer } - import akka.util.duration._ + import scala.concurrent.util.duration._ class Consumer4 extends Consumer { def endpointUri = "jetty:http://localhost:8877/camel/default" diff --git a/akka-docs/scala/code/docs/camel/CustomRoute.scala b/akka-docs/scala/code/docs/camel/CustomRoute.scala index c51d3e1fc4..e0782e834f 100644 --- a/akka-docs/scala/code/docs/camel/CustomRoute.scala +++ b/akka-docs/scala/code/docs/camel/CustomRoute.scala @@ -1,8 +1,14 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + package docs.camel import akka.camel.CamelMessage import akka.actor.Status.Failure +import language.existentials + object CustomRoute { { //#CustomRoute diff --git a/akka-docs/scala/code/docs/camel/Introduction.scala b/akka-docs/scala/code/docs/camel/Introduction.scala index a538536c62..ad4fbadf1a 100644 --- a/akka-docs/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/scala/code/docs/camel/Introduction.scala @@ -3,6 +3,8 @@ package docs.camel import akka.actor.{ Props, ActorSystem } import akka.camel.CamelExtension +import language.postfixOps + object Introduction { def foo = { //#Consumer-mina @@ -75,7 +77,7 @@ object Introduction { { //#CamelActivation import akka.camel.{ CamelMessage, Consumer } - import akka.util.duration._ + import scala.concurrent.util.duration._ class MyEndpoint extends Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" diff --git a/akka-docs/scala/code/docs/camel/Producers.scala b/akka-docs/scala/code/docs/camel/Producers.scala index 58ee243d7b..46042669ab 100644 --- a/akka-docs/scala/code/docs/camel/Producers.scala +++ b/akka-docs/scala/code/docs/camel/Producers.scala @@ -1,6 +1,7 @@ package docs.camel import akka.camel.CamelExtension +import language.postfixOps object Producers { { @@ -16,7 +17,7 @@ object Producers { //#Producer1 //#AskProducer import akka.pattern.ask - import akka.util.duration._ + import scala.concurrent.util.duration._ implicit val timeout = Timeout(10 seconds) val system = ActorSystem("some-system") diff --git a/akka-docs/scala/code/docs/camel/QuartzExample.scala b/akka-docs/scala/code/docs/camel/QuartzExample.scala index 2735511d3c..f0ad04be57 100644 --- a/akka-docs/scala/code/docs/camel/QuartzExample.scala +++ b/akka-docs/scala/code/docs/camel/QuartzExample.scala @@ -1,34 +1,30 @@ package docs.camel object QuartzExample { + //#Quartz + import akka.actor.{ ActorSystem, Props } - { - //#Quartz - import akka.actor.{ ActorSystem, Props } + import akka.camel.{ Consumer } - import akka.camel.{ Consumer } + class MyQuartzActor extends Consumer { - class MyQuartzActor extends Consumer { + def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" - def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" + def receive = { - def receive = { + case msg ⇒ println("==============> received %s " format msg) - case msg ⇒ println("==============> received %s " format msg) + } // end receive - } // end receive + } // end MyQuartzActor - } // end MyQuartzActor + object MyQuartzActor { - object MyQuartzActor { - - def main(str: Array[String]) { - val system = ActorSystem("my-quartz-system") - system.actorOf(Props[MyQuartzActor]) - } // end main - - } // end MyQuartzActor - //#Quartz - } + def main(str: Array[String]) { + val system = ActorSystem("my-quartz-system") + system.actorOf(Props[MyQuartzActor]) + } // end main + } // end MyQuartzActor + //#Quartz } diff --git a/akka-docs/scala/code/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/docs/future/FutureDocSpec.scala index edc233ecdd..4f3d8e2fd3 100644 --- a/akka-docs/scala/code/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/docs/future/FutureDocSpec.scala @@ -37,7 +37,7 @@ object FutureDocSpec { class FutureDocSpec extends AkkaSpec { import FutureDocSpec._ - + import system.dispatcher "demonstrate usage custom ExecutionContext" in { val yourExecutorServiceGoesHere = java.util.concurrent.Executors.newSingleThreadExecutor() //#diy-execution-context @@ -150,7 +150,7 @@ class FutureDocSpec extends AkkaSpec { result must be(4) val failedFilter = future1.filter(_ % 2 == 1).recover { - case m: MatchError ⇒ 0 //When filter fails, it will have a MatchError + case m: NoSuchElementException ⇒ 0 //When filter fails, it will have a java.util.NoSuchElementException } val result2 = Await.result(failedFilter, 1 second) result2 must be(0) //Can only be 0 when there was a MatchError diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index b5a298e8ff..bb0401f896 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -337,7 +337,8 @@ object AkkaBuild extends Build { override lazy val settings = super.settings ++ buildSettings ++ Seq( resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", - resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/", + //resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/", + resolvers += "Typesafe 2.10 Freshness" at "http://typesafe.artifactoryonline.com/typesafe/scala-fresh-2.10.x/", shellPrompt := { s => Project.extract(s).currentProject.id + " > " } ) @@ -498,9 +499,9 @@ object Dependencies { val slf4j = Seq(slf4jApi, Test.logback) - val agent = Seq(scalaStm, Test.scalatest, Test.junit) + val agent = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit) - val transactor = Seq(scalaStm, Test.scalatest, Test.junit) + val transactor = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit) val mailboxes = Seq(Test.scalatest, Test.junit) @@ -530,7 +531,9 @@ object Dependency { val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD - val scalaStm = "org.scala-tools" % v("scala-stm") % "0.5" // Modified BSD (Scala) + //val scalaStm = "org.scala-tools" % "scala-stm" % "0.5" // Modified BSD (Scala) + val scalaStm = "scala-stm" % "scala-stm" % "0.6-SNAPSHOT" //"0.5" // Modified BSD (Scala) + val scalaActors = "org.scala-lang" % "scala-actors" % "2.10.0-SNAPSHOT" val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT val zeroMQ = "org.zeromq" % v("zeromq-scala-binding") % "0.0.6" // ApacheV2 val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2