Change ask timeout from Timeout to Duration in typed javadsl. (#25975)
* !typ Change the ActorContext#ask in javadsl to accept a Duration instead of Timeout. * !typ Change the ActorContext#setReceiveTimeout's parameter name from d to receiveTimeout.
This commit is contained in:
parent
f7a95b5228
commit
68dc288b08
10 changed files with 20 additions and 22 deletions
|
|
@ -14,6 +14,7 @@ import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.scalatest.junit.JUnitSuite;
|
import org.scalatest.junit.JUnitSuite;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class ActorContextAskTest extends JUnitSuite {
|
public class ActorContextAskTest extends JUnitSuite {
|
||||||
|
|
@ -42,7 +43,7 @@ public class ActorContextAskTest extends JUnitSuite {
|
||||||
final Behavior<Object> snitch = Behaviors.setup((ActorContext<Object> context) -> {
|
final Behavior<Object> snitch = Behaviors.setup((ActorContext<Object> context) -> {
|
||||||
context.ask(Pong.class,
|
context.ask(Pong.class,
|
||||||
pingPong,
|
pingPong,
|
||||||
new Timeout(3, TimeUnit.SECONDS),
|
Duration.ofSeconds(3),
|
||||||
(ActorRef<Pong> ref) -> new Ping(ref),
|
(ActorRef<Pong> ref) -> new Ping(ref),
|
||||||
(pong, exception) -> {
|
(pong, exception) -> {
|
||||||
if (pong != null) return pong;
|
if (pong != null) return pong;
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,6 @@ import akka.Done;
|
||||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.scalatest.junit.JUnitSuite;
|
import org.scalatest.junit.JUnitSuite;
|
||||||
import akka.util.Timeout;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import akka.actor.typed.*;
|
import akka.actor.typed.*;
|
||||||
|
|
@ -42,7 +41,7 @@ public class WatchTest extends JUnitSuite {
|
||||||
static final class CustomTerminationMessage implements Message {
|
static final class CustomTerminationMessage implements Message {
|
||||||
}
|
}
|
||||||
|
|
||||||
final Timeout timeout = Timeout.create(Duration.ofSeconds(5));
|
final Duration timeout = Duration.ofSeconds(5);
|
||||||
|
|
||||||
final Behavior<Stop> exitingActor = receive((context, message) -> {
|
final Behavior<Stop> exitingActor = receive((context, message) -> {
|
||||||
System.out.println("Stopping!");
|
System.out.println("Stopping!");
|
||||||
|
|
|
||||||
|
|
@ -8,17 +8,16 @@ import akka.actor.typed.ActorRef;
|
||||||
import akka.actor.typed.ActorSystem;
|
import akka.actor.typed.ActorSystem;
|
||||||
import akka.actor.typed.javadsl.AskPattern;
|
import akka.actor.typed.javadsl.AskPattern;
|
||||||
import akka.actor.typed.javadsl.Behaviors;
|
import akka.actor.typed.javadsl.Behaviors;
|
||||||
import akka.util.Timeout;
|
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
public class ReceptionistApiTest {
|
public class ReceptionistApiTest {
|
||||||
|
|
||||||
public void compileOnlyApiTest() {
|
public void compileOnlyApiTest() {
|
||||||
// some dummy prerequisites
|
// some dummy prerequisites
|
||||||
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);
|
final Duration timeout = Duration.ofSeconds(3);
|
||||||
final ActorRef<String> service = null;
|
final ActorRef<String> service = null;
|
||||||
final ServiceKey<String> key = ServiceKey.create(String.class, "id");
|
final ServiceKey<String> key = ServiceKey.create(String.class, "id");
|
||||||
final ActorSystem<Void> system = null;
|
final ActorSystem<Void> system = null;
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import akka.actor.typed.Behavior;
|
||||||
import akka.actor.typed.Props;
|
import akka.actor.typed.Props;
|
||||||
import akka.actor.typed.javadsl.*;
|
import akka.actor.typed.javadsl.*;
|
||||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||||
import akka.util.Timeout;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.scalatest.junit.JUnitSuite;
|
import org.scalatest.junit.JUnitSuite;
|
||||||
import scala.concurrent.Await;
|
import scala.concurrent.Await;
|
||||||
|
|
@ -382,7 +381,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
|
|
||||||
// asking someone requires a timeout, if the timeout hits without response
|
// asking someone requires a timeout, if the timeout hits without response
|
||||||
// the ask is failed with a TimeoutException
|
// the ask is failed with a TimeoutException
|
||||||
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);
|
final Duration timeout = Duration.ofSeconds(3);
|
||||||
|
|
||||||
context.ask(
|
context.ask(
|
||||||
HalResponse.class,
|
HalResponse.class,
|
||||||
|
|
@ -448,7 +447,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
GiveMeCookies::new,
|
GiveMeCookies::new,
|
||||||
// asking someone requires a timeout and a scheduler, if the timeout hits without response
|
// asking someone requires a timeout and a scheduler, if the timeout hits without response
|
||||||
// the ask is failed with a TimeoutException
|
// the ask is failed with a TimeoutException
|
||||||
Timeout.apply(3, TimeUnit.SECONDS),
|
Duration.ofSeconds(3),
|
||||||
system.scheduler());
|
system.scheduler());
|
||||||
|
|
||||||
result.whenComplete((cookies, failure) -> {
|
result.whenComplete((cookies, failure) -> {
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ public class SpawnProtocolDocTest {
|
||||||
//#system-spawn
|
//#system-spawn
|
||||||
final ActorSystem<SpawnProtocol> system =
|
final ActorSystem<SpawnProtocol> system =
|
||||||
ActorSystem.create(HelloWorldMain.main, "hello");
|
ActorSystem.create(HelloWorldMain.main, "hello");
|
||||||
final Timeout timeout = Timeout.create(Duration.ofSeconds(3));
|
final Duration timeout = Duration.ofSeconds(3);
|
||||||
|
|
||||||
CompletionStage<ActorRef<HelloWorld.Greet>> greeter = AskPattern.ask(
|
CompletionStage<ActorRef<HelloWorld.Greet>> greeter = AskPattern.ask(
|
||||||
system,
|
system,
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package akka.actor.typed
|
package akka.actor.typed
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
import java.util.function.{ Function ⇒ JFunction }
|
import java.util.function.{ Function ⇒ JFunction }
|
||||||
import java.util.ArrayList
|
import java.util.ArrayList
|
||||||
import java.util.Optional
|
import java.util.Optional
|
||||||
|
|
@ -16,7 +17,6 @@ import scala.reflect.ClassTag
|
||||||
import scala.util.Failure
|
import scala.util.Failure
|
||||||
import scala.util.Success
|
import scala.util.Success
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
@ -90,11 +90,11 @@ import akka.util.JavaDurationConverters._
|
||||||
}
|
}
|
||||||
|
|
||||||
// Java API impl
|
// Java API impl
|
||||||
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
|
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
|
||||||
this.ask(target)(createRequest.apply) {
|
this.ask(target)(createRequest.apply) {
|
||||||
case Success(message) ⇒ applyToResponse.apply(message, null)
|
case Success(message) ⇒ applyToResponse.apply(message, null)
|
||||||
case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex)
|
case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex)
|
||||||
}(responseTimeout, ClassTag[Res](resClass))
|
}(responseTimeout.asScala, ClassTag[Res](resClass))
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] =
|
private[akka] override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] =
|
||||||
|
|
|
||||||
|
|
@ -186,7 +186,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
|
||||||
* *Warning*: This method is not thread-safe and must not be accessed from threads other
|
* *Warning*: This method is not thread-safe and must not be accessed from threads other
|
||||||
* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
|
* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
|
||||||
*/
|
*/
|
||||||
def setReceiveTimeout(d: Duration, msg: T): Unit
|
def setReceiveTimeout(timeout: Duration, msg: T): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cancel the sending of receive timeout notifications.
|
* Cancel the sending of receive timeout notifications.
|
||||||
|
|
@ -275,7 +275,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
|
||||||
def ask[Req, Res](
|
def ask[Req, Res](
|
||||||
resClass: Class[Res],
|
resClass: Class[Res],
|
||||||
target: RecipientRef[Req],
|
target: RecipientRef[Req],
|
||||||
responseTimeout: Timeout,
|
responseTimeout: Duration,
|
||||||
createRequest: java.util.function.Function[ActorRef[Res], Req],
|
createRequest: java.util.function.Function[ActorRef[Res], Req],
|
||||||
applyToResponse: BiFunction[Res, Throwable, T]): Unit
|
applyToResponse: BiFunction[Res, Throwable, T]): Unit
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,12 +5,13 @@
|
||||||
package akka.actor.typed
|
package akka.actor.typed
|
||||||
package javadsl
|
package javadsl
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
import java.util.concurrent.CompletionStage
|
import java.util.concurrent.CompletionStage
|
||||||
|
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
import akka.japi.function.{ Function ⇒ JFunction }
|
import akka.japi.function.{ Function ⇒ JFunction }
|
||||||
import akka.util.Timeout
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
||||||
import scala.compat.java8.FutureConverters._
|
import scala.compat.java8.FutureConverters._
|
||||||
|
|
||||||
|
|
@ -29,6 +30,6 @@ import scala.compat.java8.FutureConverters._
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
object AskPattern {
|
object AskPattern {
|
||||||
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] =
|
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
|
||||||
(actor.?(message.apply)(timeout, scheduler)).toJava
|
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava
|
||||||
}
|
}
|
||||||
|
|
@ -167,7 +167,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto
|
||||||
* *Warning*: This method is not thread-safe and must not be accessed from threads other
|
* *Warning*: This method is not thread-safe and must not be accessed from threads other
|
||||||
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
|
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
|
||||||
*/
|
*/
|
||||||
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
|
def setReceiveTimeout(timeout: FiniteDuration, msg: T): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cancel the sending of receive timeout notifications.
|
* Cancel the sending of receive timeout notifications.
|
||||||
|
|
|
||||||
|
|
@ -13,11 +13,10 @@ import akka.persistence.typed.EventAdapter;
|
||||||
import akka.actor.testkit.typed.javadsl.TestInbox;
|
import akka.actor.testkit.typed.javadsl.TestInbox;
|
||||||
import akka.persistence.typed.PersistenceId;
|
import akka.persistence.typed.PersistenceId;
|
||||||
import akka.persistence.typed.SideEffect;
|
import akka.persistence.typed.SideEffect;
|
||||||
import akka.util.Timeout;
|
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static akka.actor.typed.javadsl.AskPattern.ask;
|
import static akka.actor.typed.javadsl.AskPattern.ask;
|
||||||
|
|
||||||
|
|
@ -266,7 +265,7 @@ public class PersistentActorCompileOnlyTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
static ActorRef<Request> sideEffectProcessor = TestInbox.<Request>create().getRef();
|
static ActorRef<Request> sideEffectProcessor = TestInbox.<Request>create().getRef();
|
||||||
static Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
|
static Duration timeout = Duration.ofSeconds(1);
|
||||||
|
|
||||||
private static void performSideEffect(ActorRef<AcknowledgeSideEffect> sender, int correlationId, String data, Scheduler scheduler) {
|
private static void performSideEffect(ActorRef<AcknowledgeSideEffect> sender, int correlationId, String data, Scheduler scheduler) {
|
||||||
CompletionStage<Response> what = ask(sideEffectProcessor, (ActorRef<Response> ar) -> new Request(correlationId, data, ar), timeout, scheduler);
|
CompletionStage<Response> what = ask(sideEffectProcessor, (ActorRef<Response> ar) -> new Request(correlationId, data, ar), timeout, scheduler);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue