Cleaning up the code
This commit is contained in:
parent
aca5693ce6
commit
75e90cccdf
7 changed files with 20 additions and 35 deletions
|
|
@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Await }
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import akka.pattern.{ ask, pipeTo }
|
import akka.pattern.{ ask, pipe }
|
||||||
|
|
||||||
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.pattern
|
||||||
object Patterns {
|
object Patterns {
|
||||||
import akka.actor.{ ActorRef, ActorSystem }
|
import akka.actor.{ ActorRef, ActorSystem }
|
||||||
import akka.dispatch.Future
|
import akka.dispatch.Future
|
||||||
import akka.pattern.{ ask ⇒ scalaAsk }
|
import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe }
|
||||||
import akka.util.{ Timeout, Duration }
|
import akka.util.{ Timeout, Duration }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -86,7 +86,7 @@ object Patterns {
|
||||||
* Patterns.pipe(transformed, nextActor);
|
* Patterns.pipe(transformed, nextActor);
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = akka.pattern.pipe(future, recipient)
|
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] = scalaPipe(future) pipeTo recipient
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
* Returns a [[akka.dispatch.Future]] that will be completed with success (value `true`) when
|
||||||
|
|
@ -98,7 +98,6 @@ object Patterns {
|
||||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||||
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
* is completed with failure [[akka.actor.ActorTimeoutException]].
|
||||||
*/
|
*/
|
||||||
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = {
|
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
|
||||||
akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
|
akka.pattern.gracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,16 @@ import akka.actor.{ Status, ActorRef }
|
||||||
trait PipeToSupport {
|
trait PipeToSupport {
|
||||||
|
|
||||||
final class PipeableFuture[T](val future: Future[T]) {
|
final class PipeableFuture[T](val future: Future[T]) {
|
||||||
def pipeTo(actorRef: ActorRef): Future[T] = akka.pattern.pipe(future, actorRef)
|
def pipeTo(recipient: ActorRef): Future[T] =
|
||||||
|
future onComplete {
|
||||||
|
case Right(r) ⇒ recipient ! r
|
||||||
|
case Left(f) ⇒ recipient ! Status.Failure(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
def to(recipient: ActorRef): PipeableFuture[T] = {
|
||||||
|
pipeTo(recipient)
|
||||||
|
this
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -21,28 +30,5 @@ trait PipeToSupport {
|
||||||
* Future { doExpensiveCalc() } pipeTo nextActor
|
* Future { doExpensiveCalc() } pipeTo nextActor
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
implicit def pipeTo[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future)
|
implicit def pipe[T](future: Future[T]): PipeableFuture[T] = new PipeableFuture(future)
|
||||||
|
|
||||||
/**
|
|
||||||
* Register an onComplete callback on this [[akka.dispatch.Future]] to send
|
|
||||||
* the result to the given actor reference. Returns the original Future to
|
|
||||||
* allow method chaining.
|
|
||||||
*
|
|
||||||
* <b>Recommended usage example:</b>
|
|
||||||
*
|
|
||||||
* {{{
|
|
||||||
* val f = ask(worker, request)(timeout)
|
|
||||||
* flow {
|
|
||||||
* EnrichedRequest(request, f())
|
|
||||||
* } pipeTo nextActor
|
|
||||||
* }}}
|
|
||||||
*
|
|
||||||
* [see [[akka.dispatch.Future]] for a description of `flow`]
|
|
||||||
*/
|
|
||||||
def pipe[T](future: Future[T], recipient: ActorRef): Future[T] =
|
|
||||||
future onComplete {
|
|
||||||
case Right(r) ⇒ recipient ! r
|
|
||||||
case Left(f) ⇒ recipient ! Status.Failure(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.util.Duration
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.pattern.{ AskSupport, pipeTo }
|
import akka.pattern.pipe
|
||||||
import scala.collection.JavaConversions.iterableAsScalaIterable
|
import scala.collection.JavaConversions.iterableAsScalaIterable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -337,7 +337,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
"using pattern ask / pipeTo" in {
|
"using pattern ask / pipeTo" in {
|
||||||
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
|
val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
|
||||||
//#ask-pipeTo
|
//#ask-pipeTo
|
||||||
import akka.pattern.{ ask, pipeTo, pipe }
|
import akka.pattern.{ ask, pipe }
|
||||||
|
|
||||||
case class Result(x: Int, s: String, d: Double)
|
case class Result(x: Int, s: String, d: Double)
|
||||||
case object Request
|
case object Request
|
||||||
|
|
@ -352,7 +352,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
} yield Result(x, s, d)
|
} yield Result(x, s, d)
|
||||||
|
|
||||||
f pipeTo actorD // .. or ..
|
f pipeTo actorD // .. or ..
|
||||||
pipe(f, actorD)
|
pipe(f) to actorD
|
||||||
//#ask-pipeTo
|
//#ask-pipeTo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import akka.util.duration._
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.event.LoggingReceive
|
import akka.event.LoggingReceive
|
||||||
import akka.pattern.{ ask, pipeTo }
|
import akka.pattern.{ ask, pipe }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//#imports
|
//#imports
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue