chore: Make timeoutCompletionStage accept java Duration. (#2063)

This commit is contained in:
He-Pin(kerr) 2025-08-24 02:09:56 +08:00 committed by GitHub
parent 8ca24f6a08
commit 442ecd76c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 10 additions and 8 deletions

View file

@ -536,7 +536,7 @@ public class PatternsTest extends JUnitSuite {
delayedStage.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
assertEquals("Timeout of 200 milliseconds expired", e.getCause().getMessage());
assertEquals("Timeout of PT0.2S expired", e.getCause().getMessage());
}
}

View file

@ -126,8 +126,8 @@ trait FutureTimeoutSupport {
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
@deprecated("Use `CompletableFuture#orTimeout instead.", "Pekko 2.0.0")
def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])(
@deprecated("Use `CompletableFuture#orTimeout instead.", "2.0.0")
def timeoutCompletionStage[T](duration: java.time.Duration, using: Scheduler)(value: => CompletionStage[T])(
implicit ec: ExecutionContext): CompletionStage[T] = {
val stage: CompletionStage[T] =
try value
@ -138,10 +138,12 @@ trait FutureTimeoutSupport {
stage
} else {
val p = new CompletableFuture[T]
val timeout = using.scheduleOnce(duration) {
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
stage.toCompletableFuture.cancel(true)
}
val timeout = using.scheduleOnce(duration,
() => {
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
stage.toCompletableFuture.cancel(true)
()
})
stage.handle[Unit]((v: T, ex: Throwable) => {
timeout.cancel()
if (v != null) p.complete(v)

View file

@ -480,7 +480,7 @@ object Patterns {
scheduler: Scheduler,
context: ExecutionContext,
value: Callable[CompletionStage[T]]): CompletionStage[T] =
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
timeoutCompletionStage(duration, scheduler)(value.call())(context)
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]