diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index eabe00a3b2..26f92d8a6d 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Await } import akka.util.duration._ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout -import akka.pattern.{ ask, pipeTo } +import akka.pattern.{ ask, pipe } class Future2ActorSpec extends AkkaSpec with DefaultTimeout { diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index d585d88e13..348d472e89 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -6,7 +6,7 @@ package akka.pattern object Patterns { import akka.actor.{ ActorRef, ActorSystem } import akka.dispatch.Future - import akka.pattern.{ ask ⇒ scalaAsk } + import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe } import akka.util.{ Timeout, Duration } /** @@ -86,7 +86,7 @@ object Patterns { * Patterns.pipe(transformed, nextActor); * }}} */ - def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = akka.pattern.pipe(future, recipient) + def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = scalaPipe(future) pipeTo recipient /** * Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when @@ -98,7 +98,6 @@ object Patterns { * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] * is completed with failure [[akka.actor.ActorTimeoutException]]. */ - def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = { + def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] - } } diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index fef4a91ee8..66c584867a 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -9,7 +9,16 @@ import akka.actor.{ Status, ActorRef } trait PipeToSupport { final class PipeableFuture[T](val future: Future[T]) { - def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipe(future, actorRef) + def pipeTo(recipient: ActorRef): Future[T] = + future onComplete { + case Right(r) ⇒ recipient ! r + case Left(f) ⇒ recipient ! Status.Failure(f) + } + + def to(recipient: ActorRef): PipeableFuture[T] = { + pipeTo(recipient) + this + } } /** @@ -21,28 +30,5 @@ trait PipeToSupport { * Future { doExpensiveCalc() } pipeTo nextActor * }}} */ - implicit def pipeTo[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future) - - /** - * Register an onComplete callback on this [[akka.dispatch.Future]] to send - * the result to the given actor reference. Returns the original Future to - * allow method chaining. - * - * Recommended usage example: - * - * {{{ - * val f = ask(worker, request)(timeout) - * flow { - * EnrichedRequest(request, f()) - * } pipeTo nextActor - * }}} - * - * [see [[akka.dispatch.Future]] for a description of `flow`] - */ - def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = - future onComplete { - case Right(r) ⇒ recipient ! r - case Left(f) ⇒ recipient ! Status.Failure(f) - } - + implicit def pipe[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future) } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index d223bf2a32..ec4786a4c0 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -42,4 +42,4 @@ import akka.util.{ Timeout, Duration } */ package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport { -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 46dcbde8d7..e3c349cb89 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -10,7 +10,7 @@ import akka.util.Duration import akka.util.duration._ import com.typesafe.config.Config import akka.config.ConfigurationException -import akka.pattern.{ AskSupport, pipeTo } +import akka.pattern.pipe import scala.collection.JavaConversions.iterableAsScalaIterable /** diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 98fa19aba2..55a205746f 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -337,7 +337,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "using pattern ask / pipeTo" in { val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) //#ask-pipeTo - import akka.pattern.{ ask, pipeTo, pipe } + import akka.pattern.{ ask, pipe } case class Result(x: Int, s: String, d: Double) case object Request @@ -352,7 +352,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { } yield Result(x, s, d) f pipeTo actorD // .. or .. - pipe(f, actorD) + pipe(f) to actorD //#ask-pipeTo } diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala index 09f32eee91..d08bcb53b2 100644 --- a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSample.scala @@ -11,7 +11,7 @@ import akka.util.duration._ import akka.util.Duration import akka.util.Timeout import akka.event.LoggingReceive -import akka.pattern.{ ask, pipeTo } +import akka.pattern.{ ask, pipe } import com.typesafe.config.ConfigFactory //#imports