Making it green

This commit is contained in:
Viktor Klang 2012-07-22 21:40:09 +02:00
parent e5b3fd00a2
commit aa4ad6f3c3
20 changed files with 155 additions and 104 deletions

View file

@ -5,11 +5,63 @@
package akka.dispatch
import scala.runtime.{ BoxedUnit, AbstractPartialFunction }
import akka.japi.{ Function JFunc, Option JOption }
import scala.concurrent.{ Future, Promise, ExecutionContext }
import akka.japi.{ Function JFunc, Option JOption, Procedure }
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService }
import java.lang.{ Iterable JIterable }
import java.util.{ LinkedList JLinkedList }
import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.{ Executor, ExecutorService, ExecutionException, Callable, TimeoutException }
/**
* ExecutionContexts is the Java API for ExecutionContexts
*/
object ExecutionContexts {
/**
* Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor,
* and which will use the default error reporter.
*
* @param executor the Executor which will be used for the ExecutionContext
* @return a new ExecutionContext
*/
def fromExecutor(executor: Executor): ExecutionContextExecutor =
ExecutionContext.fromExecutor(executor)
/**
* Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor,
* and which will use the provided error reporter.
*
* @param executor the Executor which will be used for the ExecutionContext
* @param errorReporter a Procedure that will log any exceptions passed to it
* @return a new ExecutionContext
*/
def fromExecutor(executor: Executor, errorReporter: Procedure[Throwable]): ExecutionContextExecutor =
ExecutionContext.fromExecutor(executor, errorReporter.apply)
/**
* Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService,
* and which will use the default error reporter.
*
* @param executor the ExecutorService which will be used for the ExecutionContext
* @return a new ExecutionContext
*/
def fromExecutorService(executorService: ExecutorService): ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(executorService)
/**
* Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService,
* and which will use the provided error reporter.
*
* @param executor the ExecutorService which will be used for the ExecutionContext
* @param errorReporter a Procedure that will log any exceptions passed to it
* @return a new ExecutionContext
*/
def fromExecutorService(executorService: ExecutorService, errorReporter: Procedure[Throwable]): ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(executorService, errorReporter.apply)
/**
* @return a reference to the global ExecutionContext
*/
def global(): ExecutionContext = ExecutionContext.global
}
/**
* Futures is the Java API for Futures and Promises

View file

@ -54,6 +54,7 @@ class AgentSpec extends AkkaSpec {
}
"maintain order between alter and alterOff" in {
import system.dispatcher
val l1, l2 = new CountDownLatch(1)
val agent = Agent("a")

View file

@ -52,10 +52,10 @@ trait Activation {
* @param timeout the timeout for the Future
*/
def activationFutureFor(endpoint: ActorRef, timeout: Duration): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef] {
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointActivated(_) endpoint
case EndpointFailedToActivate(_, cause) throw cause
}
})(system.dispatcher)
/**
* Similar to awaitDeactivation but returns a future instead.
@ -63,10 +63,10 @@ trait Activation {
* @param timeout the timeout of the Future
*/
def deactivationFutureFor(endpoint: ActorRef, timeout: Duration): Future[Unit] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit] {
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[Unit]({
case EndpointDeActivated(_) ()
case EndpointFailedToDeActivate(_, cause) throw cause
}
})(system.dispatcher)
}
/**

View file

@ -17,10 +17,11 @@ import akka.pattern._
import scala.reflect.BeanProperty
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import akka.camel.internal.CamelExchangeAdapter
import akka.util.Timeout
import akka.camel.internal.CamelExchangeAdapter
import akka.camel.{ ActorNotRegisteredException, ConsumerConfig, Camel, Ack, FailureResult, CamelMessage }
/**
@ -153,48 +154,30 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
* @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
*/
private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = {
// these notify methods are just a syntax sugar
def notifyDoneSynchronously[A](a: A = null): Unit = callback.done(true)
def notifyDoneAsynchronously[A](a: A = null): Unit = callback.done(false)
def message: CamelMessage = messageFor(exchange)
if (exchange.isOutCapable) { //InOut
sendAsync(message, onComplete = forwardResponseTo(exchange) andThen notifyDoneAsynchronously)
} else { // inOnly
if (endpoint.autoack) { //autoAck
fireAndForget(message, exchange)
notifyDoneSynchronously()
if (!exchange.isOutCapable && endpoint.autoack) {
fireAndForget(messageFor(exchange), exchange)
callback.done(true)
true // done sync
} else { //manualAck
sendAsync(message, onComplete = forwardAckTo(exchange) andThen notifyDoneAsynchronously)
}
}
}
private def forwardResponseTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
} else {
val action: PartialFunction[Either[Throwable, Any], Unit] =
if (exchange.isOutCapable) {
case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setResponse(CamelMessage.canonicalize(msg))
case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(FailureResult(throwable))
}
private def forwardAckTo(exchange: CamelExchangeAdapter): PartialFunction[Either[Throwable, Any], Unit] = {
case Right(Ack) { /* no response message to set */ }
} else {
case Right(Ack) () /* no response message to set */
case Right(failure: FailureResult) exchange.setFailure(failure)
case Right(msg) exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
case Left(e: TimeoutException) exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
case Left(throwable) exchange.setFailure(FailureResult(throwable))
}
private def sendAsync(message: CamelMessage, onComplete: PartialFunction[Either[Throwable, Any], Unit]): Boolean = {
try {
actorFor(endpoint.path).ask(message)(Timeout(endpoint.replyTimeout)).onComplete(onComplete)
} catch {
case NonFatal(e) onComplete(Left(e))
val async = try actorFor(endpoint.path).ask(messageFor(exchange))(Timeout(endpoint.replyTimeout)) catch { case NonFatal(e) Future.failed(e) }
implicit val ec = camel.system.dispatcher // FIXME which ExecutionContext should be used here?
async.onComplete(action andThen { _ callback.done(false) })
false
}
false // Done async
}
private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter): Unit =

View file

@ -330,6 +330,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
}
def prepareMocks(actor: ActorRef, message: CamelMessage = message, outCapable: Boolean) {
when(camel.system) thenReturn system
when(actorEndpointPath.findActorIn(any[ActorSystem])) thenReturn Option(actor)
when(exchange.toRequestMessage(any[Map[String, Any]])) thenReturn message
when(exchange.isOutCapable) thenReturn outCapable

View file

@ -7,7 +7,7 @@ import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import scala.concurrent.util.duration._
import akka.actor.ActorSystem
import akka.util.Deadline
import java.util.concurrent.TimeoutException

View file

@ -10,7 +10,7 @@ import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.remote.testconductor.{ RoleName, Direction }
import akka.util.duration._
import scala.concurrent.util.duration._
object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig {
val first = role("first")

View file

@ -56,7 +56,7 @@ public class TypedActorDocTestBase {
}
public Future<Integer> square(int i) {
return Futures.successful(i * i, TypedActor.dispatcher());
return Futures.successful(i * i);
}
public Option<Integer> squareNowPlease(int i) {

View file

@ -251,7 +251,7 @@ public class UntypedActorDocTestBase {
}
}, system.dispatcher());
pipe(transformed).to(actorC);
pipe(transformed, system.dispatcher()).to(actorC);
//#ask-pipe
system.shutdown();
}

View file

@ -151,7 +151,7 @@ public class FaultHandlingDocSample {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}, getContext().dispatcher()))
}, getContext().dispatcher()), getContext().dispatcher())
.to(progressListener);
} else {
unhandled(msg);

View file

@ -86,10 +86,10 @@ public class FutureDocTestBase {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
ExecutionContext ec =
ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere, (scala.Function1<java.lang.Throwable,scala.runtime.BoxedUnit>)(ExecutionContext$.MODULE$.fromExecutorService$default$2()));
ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere);
//Use ec with your Futures
Future<String> f1 = Futures.successful("foo", ec);
Future<String> f1 = Futures.successful("foo");
// Then you shut the ExecutorService down somewhere at the end of your program/application.
yourExecutorServiceGoesHere.shutdown();
@ -219,8 +219,8 @@ public class FutureDocTestBase {
@Test
public void useSequence() throws Exception {
List<Future<Integer>> source = new ArrayList<Future<Integer>>();
source.add(Futures.successful(1, system.dispatcher()));
source.add(Futures.successful(2, system.dispatcher()));
source.add(Futures.successful(1));
source.add(Futures.successful(2));
//#sequence
final ExecutionContext ec = system.dispatcher();
@ -271,8 +271,8 @@ public class FutureDocTestBase {
@Test
public void useFold() throws Exception {
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(Futures.successful("a", system.dispatcher()));
source.add(Futures.successful("b", system.dispatcher()));
source.add(Futures.successful("a"));
source.add(Futures.successful("b"));
//#fold
final ExecutionContext ec = system.dispatcher();
@ -295,8 +295,8 @@ public class FutureDocTestBase {
@Test
public void useReduce() throws Exception {
List<Future<String>> source = new ArrayList<Future<String>>();
source.add(Futures.successful("a", system.dispatcher()));
source.add(Futures.successful("b", system.dispatcher()));
source.add(Futures.successful("a"));
source.add(Futures.successful("b"));
//#reduce
final ExecutionContext ec = system.dispatcher();
@ -319,10 +319,10 @@ public class FutureDocTestBase {
public void useSuccessfulAndFailed() throws Exception {
final ExecutionContext ec = system.dispatcher();
//#successful
Future<String> future = Futures.successful("Yay!", ec);
Future<String> future = Futures.successful("Yay!");
//#successful
//#failed
Future<String> otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), ec);
Future<String> otherFuture = Futures.failed(new IllegalArgumentException("Bang!"));
//#failed
Object result = Await.result(future, Duration.create(1, SECONDS));
assertEquals("Yay!", result);
@ -334,7 +334,7 @@ public class FutureDocTestBase {
public void useFilter() throws Exception {
//#filter
final ExecutionContext ec = system.dispatcher();
Future<Integer> future1 = Futures.successful(4, ec);
Future<Integer> future1 = Futures.successful(4);
Future<Integer> successfulFilter = future1.filter(Filter.filterOf(new Function<Integer, Boolean>() {
public Boolean apply(Integer i) {
return i % 2 == 0;
@ -362,7 +362,7 @@ public class FutureDocTestBase {
public void useAndThen() {
//#and-then
final ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.successful("value", ec).andThen(new OnComplete<String>() {
Future<String> future1 = Futures.successful("value").andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
@ -427,7 +427,7 @@ public class FutureDocTestBase {
@Test
public void useOnSuccessOnFailureAndOnComplete() throws Exception {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
Future<String> future = Futures.successful("foo");
//#onSuccess
final ExecutionContext ec = system.dispatcher();
@ -444,7 +444,7 @@ public class FutureDocTestBase {
//#onSuccess
}
{
Future<String> future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
Future<String> future = Futures.failed(new IllegalStateException("OHNOES"));
//#onFailure
final ExecutionContext ec = system.dispatcher();
@ -460,7 +460,7 @@ public class FutureDocTestBase {
//#onFailure
}
{
Future<String> future = Futures.successful("foo", system.dispatcher());
Future<String> future = Futures.successful("foo");
//#onComplete
final ExecutionContext ec = system.dispatcher();
@ -482,8 +482,8 @@ public class FutureDocTestBase {
{
//#zip
final ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.successful("foo", ec);
Future<String> future2 = Futures.successful("bar", ec);
Future<String> future1 = Futures.successful("foo");
Future<String> future2 = Futures.successful("bar");
Future<String> future3 = future1.zip(future2).map(new Mapper<scala.Tuple2<String, String>, String>() {
public String apply(scala.Tuple2<String, String> zipped) {
return zipped._1() + " " + zipped._2();
@ -497,10 +497,9 @@ public class FutureDocTestBase {
{
//#fallback-to
final ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"), ec);
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"), ec);
Future<String> future3 = Futures.successful("bar", ec);
Future<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"));
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"));
Future<String> future3 = Futures.successful("bar");
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);

View file

@ -363,7 +363,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
//#ask-pipeTo
import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
case class Result(x: Int, s: String, d: Double)
case object Request

View file

@ -103,6 +103,7 @@ class Worker extends Actor with ActorLogging {
counterService ! Increment(1)
// Send current progress to the initial sender
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
counterService ? GetCurrentCount map {
case CurrentCount(_, count) Progress(100.0 * count / totalCount)
} pipeTo progressListener.get

View file

@ -1,5 +1,11 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.camel
import language.postfixOps
object Consumers {
def foo = {
//#Consumer1
@ -53,7 +59,7 @@ object Consumers {
{
//#Consumer4
import akka.camel.{ CamelMessage, Consumer }
import akka.util.duration._
import scala.concurrent.util.duration._
class Consumer4 extends Consumer {
def endpointUri = "jetty:http://localhost:8877/camel/default"

View file

@ -1,8 +1,14 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.camel
import akka.camel.CamelMessage
import akka.actor.Status.Failure
import language.existentials
object CustomRoute {
{
//#CustomRoute

View file

@ -3,6 +3,8 @@ package docs.camel
import akka.actor.{ Props, ActorSystem }
import akka.camel.CamelExtension
import language.postfixOps
object Introduction {
def foo = {
//#Consumer-mina
@ -75,7 +77,7 @@ object Introduction {
{
//#CamelActivation
import akka.camel.{ CamelMessage, Consumer }
import akka.util.duration._
import scala.concurrent.util.duration._
class MyEndpoint extends Consumer {
def endpointUri = "mina:tcp://localhost:6200?textline=true"

View file

@ -1,6 +1,7 @@
package docs.camel
import akka.camel.CamelExtension
import language.postfixOps
object Producers {
{
@ -16,7 +17,7 @@ object Producers {
//#Producer1
//#AskProducer
import akka.pattern.ask
import akka.util.duration._
import scala.concurrent.util.duration._
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("some-system")

View file

@ -1,8 +1,6 @@
package docs.camel
object QuartzExample {
{
//#Quartz
import akka.actor.{ ActorSystem, Props }
@ -29,6 +27,4 @@ object QuartzExample {
} // end MyQuartzActor
//#Quartz
}
}

View file

@ -37,7 +37,7 @@ object FutureDocSpec {
class FutureDocSpec extends AkkaSpec {
import FutureDocSpec._
import system.dispatcher
"demonstrate usage custom ExecutionContext" in {
val yourExecutorServiceGoesHere = java.util.concurrent.Executors.newSingleThreadExecutor()
//#diy-execution-context
@ -150,7 +150,7 @@ class FutureDocSpec extends AkkaSpec {
result must be(4)
val failedFilter = future1.filter(_ % 2 == 1).recover {
case m: MatchError 0 //When filter fails, it will have a MatchError
case m: NoSuchElementException 0 //When filter fails, it will have a java.util.NoSuchElementException
}
val result2 = Await.result(failedFilter, 1 second)
result2 must be(0) //Can only be 0 when there was a MatchError

View file

@ -337,7 +337,8 @@ object AkkaBuild extends Build {
override lazy val settings = super.settings ++ buildSettings ++ Seq(
resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/",
resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/",
//resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/",
resolvers += "Typesafe 2.10 Freshness" at "http://typesafe.artifactoryonline.com/typesafe/scala-fresh-2.10.x/",
shellPrompt := { s => Project.extract(s).currentProject.id + " > " }
)
@ -498,9 +499,9 @@ object Dependencies {
val slf4j = Seq(slf4jApi, Test.logback)
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val agent = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit)
val mailboxes = Seq(Test.scalatest, Test.junit)
@ -530,7 +531,9 @@ object Dependency {
val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
val scalaStm = "org.scala-tools" % v("scala-stm") % "0.5" // Modified BSD (Scala)
//val scalaStm = "org.scala-tools" % "scala-stm" % "0.5" // Modified BSD (Scala)
val scalaStm = "scala-stm" % "scala-stm" % "0.6-SNAPSHOT" //"0.5" // Modified BSD (Scala)
val scalaActors = "org.scala-lang" % "scala-actors" % "2.10.0-SNAPSHOT"
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT
val zeroMQ = "org.zeromq" % v("zeromq-scala-binding") % "0.0.6" // ApacheV2
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2