diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 9f65728517..50e6bb909c 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -100,4 +100,22 @@ object Patterns { * }}} */ def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis)).asInstanceOf[Future[AnyRef]] + + /** + * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * the result to the given actor reference. Returns the original Future to + * allow method chaining. + * + * Recommended usage example: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef) } diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala new file mode 100644 index 0000000000..6827946902 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.pattern + +import akka.actor.ActorRef +import akka.dispatch.Future + +object PipeToSupport { + + class PipeableFuture[T](val future: Future[T]) { + def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 5bc426a67d..500beca807 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -69,4 +69,39 @@ package object pattern { case _ ⇒ throw new IllegalArgumentException("incompatible ActorRef " + actorRef) } + /** + * Import this implicit conversion to gain the `pipeTo` method on [[akka.dispatch.Future]]: + * + * {{{ + * import akka.pattern.pipeTo + * + * Future { doExpensiveCalc() } pipeTo nextActor + * }}} + */ + implicit def pipeTo[T](future: Future[T]): PipeToSupport.PipeableFuture[T] = new PipeToSupport.PipeableFuture(future) + + /** + * Register an onComplete callback on this [[akka.dispatch.Future]] to send + * the result to the given actor reference. Returns the original Future to + * allow method chaining. + * + * Recommended usage example: + * + * {{{ + * val f = ask(worker, request)(timeout) + * flow { + * EnrichedRequest(request, f()) + * } pipeTo nextActor + * }}} + * + * [see [[akka.dispatch.Future]] for a description of `flow`] + */ + def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = { + future onComplete { + case Right(r) ⇒ actorRef ! r + case Left(f) ⇒ actorRef ! akka.actor.Status.Failure(f) + } + future + } + }