feat: Add Pattern timeout support (#1424)

This commit is contained in:
He-Pin(kerr) 2025-02-28 11:37:26 +08:00 committed by GitHub
parent abc18a5cea
commit 466d4a500c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 142 additions and 5 deletions

View file

@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
@ -36,6 +37,7 @@ import static org.apache.pekko.pattern.Patterns.ask;
import static org.apache.pekko.pattern.Patterns.pipe;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
public class PatternsTest extends JUnitSuite {
@ -485,6 +487,51 @@ public class PatternsTest extends JUnitSuite {
assertEquals(expected, actual);
}
@Test
public void testCompletedStageWithTimeout() throws Exception {
final String expected = "Hello";
final CompletionStage<String> delayedStage =
Patterns.timeout(
Duration.ofMillis(200),
system.scheduler(),
ec,
() -> CompletableFuture.completedFuture(expected));
final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}
@Test
public void testFailedCompletedStageWithTimeout() throws Exception {
final CompletionStage<String> delayedStage =
Patterns.timeout(
Duration.ofMillis(200),
system.scheduler(),
ec,
() -> {
CompletableFuture<String> f = new CompletableFuture<>();
f.completeExceptionally(new IllegalStateException("Illegal!"));
return f;
});
try {
delayedStage.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof IllegalStateException);
assertEquals("Illegal!", e.getCause().getMessage());
}
}
@Test
public void testCompletedWithTimeout() throws Exception {
final CompletionStage<String> delayedStage =
Patterns.timeout(Duration.ofMillis(200), system.scheduler(), ec, CompletableFuture::new);
try {
delayedStage.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
assertEquals("Timeout of 200 milliseconds expired", e.getCause().getMessage());
}
}
@Test
public void testGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class));

View file

@ -13,10 +13,8 @@
package org.apache.pekko.pattern
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ Await, ExecutionContextExecutor, Future, Promise, TimeoutException }
import scala.concurrent.duration._
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ PekkoSpec, TestLatch }
@ -76,4 +74,23 @@ class PatternSpec extends PekkoSpec {
intercept[IllegalStateException] { Await.result(r, remainingOrDefault) }.getMessage should ===("Mexico")
}
}
"pattern.timeout" must {
"be completed successfully eventually" in {
val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.successful(5))
Await.result(f, remainingOrDefault) should ===(5)
}
"be completed abnormally eventually" in {
val f =
pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.failed(new IllegalStateException("ABC")))
intercept[IllegalStateException] { Await.result(f, remainingOrDefault) }.getMessage should ===("ABC")
}
"be completed with a TimeoutException if not completed within the specified time" in {
val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.never)
intercept[TimeoutException] { Await.result(f, remainingOrDefault) }
}
}
}

View file

@ -13,8 +13,7 @@
package org.apache.pekko.pattern
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutException }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
@ -87,4 +86,66 @@ trait FutureTimeoutSupport {
}
p
}
/**
* Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(
implicit ec: ExecutionContext): Future[T] = {
val future =
try value
catch {
case NonFatal(t) => Future.failed(t)
}
future.value match {
case Some(_) => future
case None => // not completed yet
val p = Promise[T]()
val timeout = using.scheduleOnce(duration) {
p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
if (future.isInstanceOf[CompletableFuture[T @unchecked]]) {
future.asInstanceOf[CompletableFuture[T]]
.toCompletableFuture
.cancel(true)
}
}
future.onComplete { result =>
timeout.cancel()
p.tryComplete(result)
}(pekko.dispatch.ExecutionContexts.parasitic)
p.future
}
}
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])(
implicit ec: ExecutionContext): CompletionStage[T] = {
val stage: CompletionStage[T] =
try value
catch {
case NonFatal(t) => Futures.failedCompletionStage(t)
}
if (stage.toCompletableFuture.isDone) {
stage
} else {
val p = new CompletableFuture[T]
val timeout = using.scheduleOnce(duration) {
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
stage.toCompletableFuture.cancel(true)
}
stage.handle[Unit]((v: T, ex: Throwable) => {
timeout.cancel()
if (v != null) p.complete(v)
if (ex != null) p.completeExceptionally(ex)
})
p
}
}
}

View file

@ -465,6 +465,18 @@ object Patterns {
value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[java.util.concurrent.TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeout[T](
duration: java.time.Duration,
scheduler: Scheduler,
context: ExecutionContext,
value: Callable[CompletionStage[T]]): CompletionStage[T] =
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.