Removing pipeTo from akka.actor, renamed pipeTo(f,a) to pipe(f,a)
This commit is contained in:
parent
815245a133
commit
571af3d8d2
11 changed files with 24 additions and 38 deletions
|
|
@ -5,11 +5,10 @@ package akka.dataflow
|
||||||
|
|
||||||
import akka.actor.{ Actor, Props }
|
import akka.actor.{ Actor, Props }
|
||||||
import akka.dispatch.{ Future, Await }
|
import akka.dispatch.{ Future, Await }
|
||||||
import akka.actor.future2actor
|
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import akka.pattern.ask
|
import akka.pattern.{ ask, pipeTo }
|
||||||
|
|
||||||
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,15 +27,4 @@ package object actor {
|
||||||
val i = n.lastIndexOf('.')
|
val i = n.lastIndexOf('.')
|
||||||
n.substring(i + 1)
|
n.substring(i + 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit def future2actor[T](f: akka.dispatch.Future[T]) = new {
|
|
||||||
def pipeTo(actor: ActorRef): this.type = {
|
|
||||||
f onComplete {
|
|
||||||
case Right(r) ⇒ actor ! r
|
|
||||||
case Left(f) ⇒ actor ! Status.Failure(f)
|
|
||||||
}
|
|
||||||
this
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -83,10 +83,10 @@ object Patterns {
|
||||||
* // apply some transformation (i.e. enrich with request info)
|
* // apply some transformation (i.e. enrich with request info)
|
||||||
* final Future<Object> transformed = f.map(new akka.japi.Function<Object, Object>() { ... });
|
* final Future<Object> transformed = f.map(new akka.japi.Function<Object, Object>() { ... });
|
||||||
* // send it on to the next stage
|
* // send it on to the next stage
|
||||||
* Patterns.pipeTo(transformed, nextActor);
|
* Patterns.pipe(transformed, nextActor);
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef)
|
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = akka.pattern.pipe(future, recipient)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.dispatch.Future
|
||||||
object PipeToSupport {
|
object PipeToSupport {
|
||||||
|
|
||||||
class PipeableFuture[T](val future: Future[T]) {
|
class PipeableFuture[T](val future: Future[T]) {
|
||||||
def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipeTo(future, actorRef)
|
def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipe(future, actorRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -131,12 +131,10 @@ package object pattern {
|
||||||
*
|
*
|
||||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
||||||
*/
|
*/
|
||||||
def pipeTo[T](future: Future[T], actorRef: ActorRef): Future[T] = {
|
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] =
|
||||||
future onComplete {
|
future onComplete {
|
||||||
case Right(r) ⇒ actorRef ! r
|
case Right(r) ⇒ recipient ! r
|
||||||
case Left(f) ⇒ actorRef ! Status.Failure(f)
|
case Left(f) ⇒ recipient ! Status.Failure(f)
|
||||||
}
|
|
||||||
future
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.util.Duration
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.pattern.AskSupport
|
import akka.pattern.{ AskSupport, pipeTo }
|
||||||
import scala.collection.JavaConversions.iterableAsScalaIterable
|
import scala.collection.JavaConversions.iterableAsScalaIterable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -38,16 +38,16 @@ import akka.util.Duration;
|
||||||
import akka.actor.ActorTimeoutException;
|
import akka.actor.ActorTimeoutException;
|
||||||
//#import-gracefulStop
|
//#import-gracefulStop
|
||||||
|
|
||||||
//#import-askPipeTo
|
//#import-askPipe
|
||||||
import static akka.pattern.Patterns.ask;
|
import static akka.pattern.Patterns.ask;
|
||||||
import static akka.pattern.Patterns.pipeTo;
|
import static akka.pattern.Patterns.pipe;
|
||||||
import akka.dispatch.Future;
|
import akka.dispatch.Future;
|
||||||
import akka.dispatch.Futures;
|
import akka.dispatch.Futures;
|
||||||
import akka.util.Duration;
|
import akka.util.Duration;
|
||||||
import akka.util.Timeout;
|
import akka.util.Timeout;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
//#import-askPipeTo
|
//#import-askPipe
|
||||||
|
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
import akka.actor.UntypedActor;
|
import akka.actor.UntypedActor;
|
||||||
|
|
@ -224,12 +224,12 @@ public class UntypedActorDocTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void usePatternsAskPipeTo() {
|
public void usePatternsAskPipe() {
|
||||||
ActorSystem system = ActorSystem.create("MySystem");
|
ActorSystem system = ActorSystem.create("MySystem");
|
||||||
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
|
ActorRef actorA = system.actorOf(new Props(MyUntypedActor.class));
|
||||||
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
|
ActorRef actorB = system.actorOf(new Props(MyUntypedActor.class));
|
||||||
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
|
ActorRef actorC = system.actorOf(new Props(MyUntypedActor.class));
|
||||||
//#ask-pipeTo
|
//#ask-pipe
|
||||||
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
|
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
|
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
|
||||||
|
|
@ -247,8 +247,8 @@ public class UntypedActorDocTestBase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
pipeTo(transformed, actorC);
|
pipe(transformed, actorC);
|
||||||
//#ask-pipeTo
|
//#ask-pipe
|
||||||
system.shutdown();
|
system.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import static akka.japi.Util.manifest;
|
||||||
|
|
||||||
import static akka.actor.SupervisorStrategy.*;
|
import static akka.actor.SupervisorStrategy.*;
|
||||||
import static akka.pattern.Patterns.ask;
|
import static akka.pattern.Patterns.ask;
|
||||||
import static akka.pattern.Patterns.pipeTo;
|
import static akka.pattern.Patterns.pipe;
|
||||||
|
|
||||||
import static akka.docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
|
import static akka.docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
|
||||||
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
|
import static akka.docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
|
||||||
|
|
@ -145,7 +145,7 @@ public class FaultHandlingDocSample {
|
||||||
counterService.tell(new Increment(1), getSelf());
|
counterService.tell(new Increment(1), getSelf());
|
||||||
|
|
||||||
// Send current progress to the initial sender
|
// Send current progress to the initial sender
|
||||||
pipeTo(ask(counterService, GetCurrentCount, askTimeout)
|
pipe(ask(counterService, GetCurrentCount, askTimeout)
|
||||||
.mapTo(manifest(CurrentCount.class))
|
.mapTo(manifest(CurrentCount.class))
|
||||||
.map(new Mapper<CurrentCount, Progress>() {
|
.map(new Mapper<CurrentCount, Progress>() {
|
||||||
public Progress apply(CurrentCount c) {
|
public Progress apply(CurrentCount c) {
|
||||||
|
|
|
||||||
|
|
@ -323,15 +323,15 @@ Ask: Send-And-Receive-Future
|
||||||
The ``ask`` pattern involves actors as well as futures, hence it is offered as
|
The ``ask`` pattern involves actors as well as futures, hence it is offered as
|
||||||
a use pattern rather than a method on :class:`ActorRef`:
|
a use pattern rather than a method on :class:`ActorRef`:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipeTo
|
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#import-askPipe
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipeTo
|
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#ask-pipe
|
||||||
|
|
||||||
This example demonstrates ``ask`` together with the ``pipeTo`` pattern on
|
This example demonstrates ``ask`` together with the ``pipe`` pattern on
|
||||||
futures, because this is likely to be a common combination. Please note that
|
futures, because this is likely to be a common combination. Please note that
|
||||||
all of the above is completely non-blocking and asynchronous: ``ask`` produces
|
all of the above is completely non-blocking and asynchronous: ``ask`` produces
|
||||||
a :class:`Future`, two of which are composed into a new future using the
|
a :class:`Future`, two of which are composed into a new future using the
|
||||||
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipeTo`` installs
|
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipe`` installs
|
||||||
an ``onComplete``-handler on the future to effect the submission of the
|
an ``onComplete``-handler on the future to effect the submission of the
|
||||||
aggregated :class:`Result` to another actor.
|
aggregated :class:`Result` to another actor.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -314,7 +314,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
"using pattern ask / pipeTo" in {
|
"using pattern ask / pipeTo" in {
|
||||||
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
|
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
|
||||||
//#ask-pipeTo
|
//#ask-pipeTo
|
||||||
import akka.pattern.{ ask, pipeTo }
|
import akka.pattern.{ ask, pipeTo, pipe }
|
||||||
|
|
||||||
case class Result(x: Int, s: String, d: Double)
|
case class Result(x: Int, s: String, d: Double)
|
||||||
case object Request
|
case object Request
|
||||||
|
|
@ -329,7 +329,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
} yield Result(x, s, d)
|
} yield Result(x, s, d)
|
||||||
|
|
||||||
f pipeTo actorD // .. or ..
|
f pipeTo actorD // .. or ..
|
||||||
pipeTo(f, actorD)
|
pipe(f, actorD)
|
||||||
//#ask-pipeTo
|
//#ask-pipeTo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import akka.util.duration._
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.event.LoggingReceive
|
import akka.event.LoggingReceive
|
||||||
import akka.pattern.ask
|
import akka.pattern.{ ask, pipeTo }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//#imports
|
//#imports
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue