Java time duration support for javadsl #24646

java.time.Duration support in all jdocs java file, the persistence module and the distributed-data
This commit is contained in:
Guy Youansi 2018-06-26 15:41:30 +02:00 committed by Johan Andrén
parent ced08fcb57
commit a4951e0ce7
30 changed files with 214 additions and 141 deletions

View file

@ -8,7 +8,6 @@ import akka.actor.*;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.TestLatch;
import akka.testkit.TestProbe; import akka.testkit.TestProbe;
import akka.util.Timeout; import akka.util.Timeout;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -17,11 +16,11 @@ import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.time.Duration;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe; import static akka.pattern.Patterns.pipe;
@ -75,15 +74,17 @@ public class PatternsTest extends JUnitSuite {
@Test @Test
public void useAsk() throws Exception { public void useAsk() throws Exception {
ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test"); ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test");
scala.concurrent.duration.Duration timeout = scala.concurrent.duration.Duration.create(3, "seconds");
assertEquals("Ask should return expected answer", assertEquals("Ask should return expected answer",
JavaAPITestActor.ANSWER, Await.result(ask(testActor, "hey!", 3000), Duration.create(3, "seconds"))); JavaAPITestActor.ANSWER, Await.result(ask(testActor, "hey!", 3000), timeout));
} }
@Test @Test
public void useAskWithActorSelection() throws Exception { public void useAskWithActorSelection() throws Exception {
scala.concurrent.duration.Duration timeout = scala.concurrent.duration.Duration.create(3, "seconds");
ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test2"); ActorRef testActor = system.actorOf(Props.create(JavaAPITestActor.class), "test2");
ActorSelection selection = system.actorSelection("/user/test2"); ActorSelection selection = system.actorSelection("/user/test2");
ActorIdentity id = (ActorIdentity) Await.result(ask(selection, new Identify("yo!"), 3000), Duration.create(3, "seconds")); ActorIdentity id = (ActorIdentity) Await.result(ask(selection, new Identify("yo!"), 3000), timeout);
assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getActorRef().get()); assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getActorRef().get());
} }
@ -251,7 +252,7 @@ public class PatternsTest extends JUnitSuite {
Patterns.retry( Patterns.retry(
() -> Futures.successful(expected), () -> Futures.successful(expected),
3, 3,
Duration.apply(200, "millis"), scala.concurrent.duration.Duration.apply(200, "millis"),
system.scheduler(), ec); system.scheduler(), ec);
String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS)); String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS));
@ -268,7 +269,7 @@ public class PatternsTest extends JUnitSuite {
PatternsCS.retry( PatternsCS.retry(
attempt, attempt,
3, 3,
java.time.Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), ec); system.scheduler(), ec);
final String actual = retriedStage.toCompletableFuture().get(3, SECONDS); final String actual = retriedStage.toCompletableFuture().get(3, SECONDS);
@ -281,13 +282,13 @@ public class PatternsTest extends JUnitSuite {
Future<String> delayedFuture = Patterns Future<String> delayedFuture = Patterns
.after( .after(
Duration.create(200, "millis"), scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(), system.scheduler(),
ec, ec,
failedCallable); failedCallable);
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS));
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -296,7 +297,7 @@ public class PatternsTest extends JUnitSuite {
Future<String> delayedFuture = Patterns Future<String> delayedFuture = Patterns
.after( .after(
Duration.create(200, "millis"), scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(), system.scheduler(),
ec, ec,
failedFuture); failedFuture);
@ -311,7 +312,7 @@ public class PatternsTest extends JUnitSuite {
Future<String> delayedFuture = Patterns Future<String> delayedFuture = Patterns
.after( .after(
Duration.create(200, "millis"), scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(), system.scheduler(),
ec, ec,
() -> Futures.successful(expected)); () -> Futures.successful(expected));
@ -328,7 +329,7 @@ public class PatternsTest extends JUnitSuite {
Future<String> delayedFuture = Patterns Future<String> delayedFuture = Patterns
.after( .after(
Duration.create(200, "millis"), scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(), system.scheduler(),
ec, ec,
Futures.successful(expected)); Futures.successful(expected));
@ -345,7 +346,7 @@ public class PatternsTest extends JUnitSuite {
Future<String> delayedFuture = Patterns Future<String> delayedFuture = Patterns
.after( .after(
Duration.create(200, "millis"), scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(), system.scheduler(),
ec, ec,
Futures.successful("world")); Futures.successful("world"));
@ -368,7 +369,7 @@ public class PatternsTest extends JUnitSuite {
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = PatternsCS
.after( .after(
java.time.Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
ec, ec,
failedCallable); failedCallable);
@ -386,7 +387,7 @@ public class PatternsTest extends JUnitSuite {
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = PatternsCS
.after( .after(
Duration.create(200, "millis"), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
ec, ec,
failedFuture); failedFuture);
@ -405,7 +406,7 @@ public class PatternsTest extends JUnitSuite {
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = PatternsCS
.after( .after(
java.time.Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
ec, ec,
cf); cf);
@ -422,7 +423,7 @@ public class PatternsTest extends JUnitSuite {
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = PatternsCS
.after( .after(
Duration.create(200, "millis"), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
ec, ec,
f); f);
@ -439,7 +440,7 @@ public class PatternsTest extends JUnitSuite {
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = PatternsCS
.after( .after(
Duration.create(200, "millis"), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
ec, ec,
f); f);
@ -463,7 +464,7 @@ public class PatternsTest extends JUnitSuite {
@Test @Test
public void testCSGracefulStop() throws Exception { public void testCSGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class)); ActorRef target = system.actorOf(Props.create(StopActor.class));
CompletionStage<Boolean> result = PatternsCS.gracefulStop(target, java.time.Duration.ofMillis(200)); CompletionStage<Boolean> result = PatternsCS.gracefulStop(target, Duration.ofMillis(200));
Boolean actual = result.toCompletableFuture().get(3, SECONDS); Boolean actual = result.toCompletableFuture().get(3, SECONDS);
assertEquals(true, actual); assertEquals(true, actual);

View file

@ -6,8 +6,6 @@ package akka.util;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import org.scalatest.junit.JUnitSuiteLike;
import scala.concurrent.duration.Duration;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;

View file

@ -11,12 +11,13 @@ import akka.Done;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import akka.util.Timeout; import akka.util.Timeout;
import org.junit.Test; import org.junit.Test;
import akka.actor.typed.*; import akka.actor.typed.*;
import java.time.Duration;
import static akka.Done.done; import static akka.Done.done;
import static akka.actor.typed.javadsl.Behaviors.*; import static akka.actor.typed.javadsl.Behaviors.*;
@ -41,8 +42,7 @@ public class WatchTest extends JUnitSuite {
static final class CustomTerminationMessage implements Message { static final class CustomTerminationMessage implements Message {
} }
// final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); final Timeout timeout = Timeout.create(Duration.ofSeconds(5));
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final Behavior<Stop> exitingActor = receive((ctx, msg) -> { final Behavior<Stop> exitingActor = receive((ctx, msg) -> {
System.out.println("Stopping!"); System.out.println("Stopping!");

View file

@ -16,7 +16,7 @@ import static akka.actor.SupervisorStrategy.Directive;
* <pre> * <pre>
* &#64;Override * &#64;Override
* private static SupervisorStrategy strategy = * private static SupervisorStrategy strategy =
* new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder. * new OneForOneStrategy(10, Duration.ofMinutes(1), DeciderBuilder.
* match(ArithmeticException.class, e -&gt; resume()). * match(ArithmeticException.class, e -&gt; resume()).
* match(NullPointerException.class, e -&gt; restart()). * match(NullPointerException.class, e -&gt; restart()).
* match(IllegalArgumentException.class, e -&gt; stop()). * match(IllegalArgumentException.class, e -&gt; stop()).

View file

@ -8,9 +8,11 @@ import language.implicitConversions
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.collection.mutable import scala.collection.mutable
import akka.routing.{ Deafen, Listen, Listeners } import akka.routing.{ Deafen, Listen, Listeners }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.JavaDurationConverters
object FSM { object FSM {
@ -178,6 +180,18 @@ object FSM {
} // so we do not have to break source/binary compat. } // so we do not have to break source/binary compat.
// TODO: Can be removed once we can break State#timeout signature to `Option[Duration]` // TODO: Can be removed once we can break State#timeout signature to `Option[Duration]`
/**
* JAVA API: Modify state transition descriptor to include a state timeout for the
* next state. This timeout overrides any default timeout set for the next
* state.
*
* Use Duration.Inf to deactivate an existing timeout.
*/
def forMax(timeout: java.time.Duration): State[S, D] = {
import JavaDurationConverters._
forMax(timeout.asScala)
}
/** /**
* Send reply to sender of the current message, if available. * Send reply to sender of the current message, if available.
* *

View file

@ -9,6 +9,7 @@ import language.implicitConversions
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.util.JavaDurationConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
@ -398,24 +399,48 @@ case class AllForOneStrategy(
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange.asScala, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/** /**
* Java API * Java API
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange.asScala)(SupervisorStrategy.makeDecider(decider))
/** /**
* Java API * Java API
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange.asScala)(SupervisorStrategy.makeDecider(trapExit))
/** /**
* Java API: compatible with lambda expressions * Java API: compatible with lambda expressions
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider) this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider)
/**
* Java API: compatible with lambda expressions
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange.asScala)(decider)
/** /**
* Java API: compatible with lambda expressions * Java API: compatible with lambda expressions
*/ */
@ -471,24 +496,48 @@ case class OneForOneStrategy(
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange.asScala, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/** /**
* Java API * Java API
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange.asScala)(SupervisorStrategy.makeDecider(decider))
/** /**
* Java API * Java API
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange.asScala)(SupervisorStrategy.makeDecider(trapExit))
/** /**
* Java API: compatible with lambda expressions * Java API: compatible with lambda expressions
*/ */
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) = def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider) this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider)
/**
* Java API: compatible with lambda expressions
*/
def this(maxNrOfRetries: Int, withinTimeRange: java.time.Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange.asScala)(decider)
def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) = def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) =
this(loggingEnabled = loggingEnabled)(decider) this(loggingEnabled = loggingEnabled)(decider)

View file

@ -10,12 +10,12 @@ import akka.testkit.javadsl.EventFilter;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.util.Timeout; import akka.util.Timeout;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.Duration;
import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;

View file

@ -52,6 +52,7 @@ import akka.annotation.InternalApi
import scala.collection.immutable.TreeSet import scala.collection.immutable.TreeSet
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import scala.annotation.varargs import scala.annotation.varargs
import akka.util.JavaDurationConverters._
object ReplicatorSettings { object ReplicatorSettings {
@ -290,11 +291,27 @@ object Replicator {
} }
final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency {
require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1")
/**
* Java API
*/
def this(n: Int, timeout: java.time.Duration) = this(n, timeout.asScala)
} }
final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
/**
* Java API
*/
def this(timeout: java.time.Duration) = this(timeout.asScala, DefaultMajorityMinCap)
}
final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency {
/**
* Java API
*/
def this(timeout: java.time.Duration) = this(timeout.asScala)
} }
final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency
sealed trait WriteConsistency { sealed trait WriteConsistency {
def timeout: FiniteDuration def timeout: FiniteDuration
@ -304,11 +321,27 @@ object Replicator {
} }
final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency {
require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1")
/**
* Java API
*/
def this(n: Int, timeout: java.time.Duration) = this(n, timeout.asScala)
} }
final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
/**
* Java API
*/
def this(timeout: java.time.Duration) = this(timeout.asScala, DefaultMajorityMinCap)
}
final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency {
/**
* Java API
*/
def this(timeout: java.time.Duration) = this(timeout.asScala)
} }
final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency
/** /**
* Java API: The `ReadLocal` instance * Java API: The `ReadLocal` instance

View file

@ -9,8 +9,6 @@ import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
@ -50,8 +48,8 @@ public class DependencyInjectionDocTest extends AbstractJavaTest {
} }
@AfterClass @AfterClass
public static void afterClass() throws Exception { public static void afterClass() {
Await.ready(system.terminate(), Duration.create("5 seconds")); TestKit.shutdownActorSystem(system);
} }
//this is just to make the test below a tiny fraction nicer //this is just to make the test below a tiny fraction nicer

View file

@ -232,7 +232,7 @@ public class FaultHandlingDocSample {
// Restart the storage child when StorageException is thrown. // Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped. // After 3 restarts within 5 seconds it will be stopped.
private static final SupervisorStrategy strategy = private static final SupervisorStrategy strategy =
new OneForOneStrategy(3, scala.concurrent.duration.Duration.create("5 seconds"), DeciderBuilder. new OneForOneStrategy(3, Duration.ofSeconds(5), DeciderBuilder.
match(StorageException.class, e -> restart()). match(StorageException.class, e -> restart()).
matchAny(o -> escalate()).build()); matchAny(o -> escalate()).build());

View file

@ -12,7 +12,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
@ -21,7 +21,6 @@ import akka.testkit.TestProbe;
import akka.testkit.ErrorFilter; import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter; import akka.testkit.EventFilter;
import akka.testkit.TestEvent; import akka.testkit.TestEvent;
import scala.concurrent.duration.Duration;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq; import static akka.japi.Util.immutableSeq;
import scala.concurrent.Await; import scala.concurrent.Await;
@ -55,7 +54,7 @@ public class FaultHandlingTest extends AbstractJavaTest {
//#strategy //#strategy
private static SupervisorStrategy strategy = private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), new OneForOneStrategy(10, Duration.ofMinutes(1),
DeciderBuilder DeciderBuilder
.match(ArithmeticException.class, e -> SupervisorStrategy.resume()) .match(ArithmeticException.class, e -> SupervisorStrategy.resume())
.match(NullPointerException.class, e -> SupervisorStrategy.restart()) .match(NullPointerException.class, e -> SupervisorStrategy.restart())
@ -88,7 +87,7 @@ public class FaultHandlingTest extends AbstractJavaTest {
//#strategy2 //#strategy2
private static SupervisorStrategy strategy = private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder. new OneForOneStrategy(10, Duration.ofMinutes(1), DeciderBuilder.
match(ArithmeticException.class, e -> SupervisorStrategy.resume()). match(ArithmeticException.class, e -> SupervisorStrategy.resume()).
match(NullPointerException.class, e -> SupervisorStrategy.restart()). match(NullPointerException.class, e -> SupervisorStrategy.restart()).
match(IllegalArgumentException.class, e -> SupervisorStrategy.stop()). match(IllegalArgumentException.class, e -> SupervisorStrategy.stop()).
@ -138,7 +137,7 @@ public class FaultHandlingTest extends AbstractJavaTest {
//#testkit //#testkit
static ActorSystem system; static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS); scala.concurrent.duration.Duration timeout = scala.concurrent.duration.Duration.create(5, SECONDS);
@BeforeClass @BeforeClass
public static void start() { public static void start() {

View file

@ -14,10 +14,9 @@ import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class InitializationDocTest extends AbstractJavaTest { public class InitializationDocTest extends AbstractJavaTest {
@ -29,8 +28,8 @@ public class InitializationDocTest extends AbstractJavaTest {
} }
@AfterClass @AfterClass
public static void afterClass() throws Exception { public static void afterClass() {
Await.ready(system.terminate(), Duration.create("5 seconds")); TestKit.shutdownActorSystem(system);
} }
static public class PreStartInitExample extends AbstractActor { static public class PreStartInitExample extends AbstractActor {
@ -141,7 +140,7 @@ public class InitializationDocTest extends AbstractJavaTest {
String msg = "U OK?"; String msg = "U OK?";
testactor.tell(msg, getRef()); testactor.tell(msg, getRef());
expectNoMsg(Duration.create(1, TimeUnit.SECONDS)); expectNoMessage(Duration.ofSeconds(1));
testactor.tell("init", getRef()); testactor.tell("init", getRef());
testactor.tell(msg, getRef()); testactor.tell(msg, getRef());

View file

@ -10,13 +10,13 @@ import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static jdocs.actor.fsm.FSMDocTest.StateType.*; import static jdocs.actor.fsm.FSMDocTest.StateType.*;
import static jdocs.actor.fsm.FSMDocTest.Messages.*; import static jdocs.actor.fsm.FSMDocTest.Messages.*;
import static java.util.concurrent.TimeUnit.*;
import java.time.Duration;
public class FSMDocTest extends AbstractJavaTest { public class FSMDocTest extends AbstractJavaTest {
static ActorSystem system; static ActorSystem system;
@ -64,7 +64,7 @@ public class FSMDocTest extends AbstractJavaTest {
//#modifier-syntax //#modifier-syntax
when(SomeState, matchAnyEvent((msg, data) -> { when(SomeState, matchAnyEvent((msg, data) -> {
return goTo(Processing).using(newData). return goTo(Processing).using(newData).
forMax(Duration.create(5, SECONDS)).replying(WillDo); forMax(Duration.ofSeconds(5)).replying(WillDo);
})); }));
//#modifier-syntax //#modifier-syntax
@ -75,7 +75,7 @@ public class FSMDocTest extends AbstractJavaTest {
//#transition-syntax //#transition-syntax
onTransition( onTransition(
matchState(Active, Idle, () -> setTimer("timeout", matchState(Active, Idle, () -> setTimer("timeout",
Tick, java.time.Duration.ofSeconds(1L), true)). Tick, Duration.ofSeconds(1L), true)).
state(Active, null, () -> cancelTimer("timeout")). state(Active, null, () -> cancelTimer("timeout")).
state(null, Idle, (f, t) -> log().info("entering Idle from " + f))); state(null, Idle, (f, t) -> log().info("entering Idle from " + f)));
//#transition-syntax //#transition-syntax

View file

@ -15,9 +15,8 @@ import akka.testkit.javadsl.TestKit;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import docs.ddata.DistributedDataDocSpec; import docs.ddata.DistributedDataDocSpec;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import scala.concurrent.duration.Duration; import java.time.Duration;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.junit.Test; import org.junit.Test;
@ -68,16 +67,16 @@ public class DistributedDataDocTest extends AbstractJavaTest {
replicator.tell(new Replicator.Update<PNCounter>(counter1Key, PNCounter.create(), replicator.tell(new Replicator.Update<PNCounter>(counter1Key, PNCounter.create(),
Replicator.writeLocal(), curr -> curr.increment(node, 1)), getSelf()); Replicator.writeLocal(), curr -> curr.increment(node, 1)), getSelf());
final WriteConsistency writeTo3 = new WriteTo(3, Duration.create(1, SECONDS)); final WriteConsistency writeTo3 = new WriteTo(3, Duration.ofSeconds(1));
replicator.tell(new Replicator.Update<GSet<String>>(set1Key, GSet.create(), replicator.tell(new Replicator.Update<GSet<String>>(set1Key, GSet.create(),
writeTo3, curr -> curr.add("hello")), getSelf()); writeTo3, curr -> curr.add("hello")), getSelf());
final WriteConsistency writeMajority = final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS)); new WriteMajority(Duration.ofSeconds(5));
replicator.tell(new Replicator.Update<ORSet<String>>(set2Key, ORSet.create(), replicator.tell(new Replicator.Update<ORSet<String>>(set2Key, ORSet.create(),
writeMajority, curr -> curr.add(node, "hello")), getSelf()); writeMajority, curr -> curr.add(node, "hello")), getSelf());
final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS)); final WriteConsistency writeAll = new WriteAll(Duration.ofSeconds(5));
replicator.tell(new Replicator.Update<Flag>(activeFlagKey, Flag.create(), replicator.tell(new Replicator.Update<Flag>(activeFlagKey, Flag.create(),
writeAll, curr -> curr.switchOn()), getSelf()); writeAll, curr -> curr.switchOn()), getSelf());
}); });
@ -111,7 +110,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
final ActorRef replicator = final ActorRef replicator =
DistributedData.get(getContext().getSystem()).replicator(); DistributedData.get(getContext().getSystem()).replicator();
final WriteConsistency writeTwo = new WriteTo(2, Duration.create(3, SECONDS)); final WriteConsistency writeTwo = new WriteTo(2, Duration.ofSeconds(3));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1"); final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
@Override @Override
@ -160,15 +159,15 @@ public class DistributedDataDocTest extends AbstractJavaTest {
replicator.tell(new Replicator.Get<PNCounter>(counter1Key, replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
Replicator.readLocal()), getSelf()); Replicator.readLocal()), getSelf());
final ReadConsistency readFrom3 = new ReadFrom(3, Duration.create(1, SECONDS)); final ReadConsistency readFrom3 = new ReadFrom(3, Duration.ofSeconds(1));
replicator.tell(new Replicator.Get<GSet<String>>(set1Key, replicator.tell(new Replicator.Get<GSet<String>>(set1Key,
readFrom3), getSelf()); readFrom3), getSelf());
final ReadConsistency readMajority = new ReadMajority(Duration.create(5, SECONDS)); final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(5));
replicator.tell(new Replicator.Get<ORSet<String>>(set2Key, replicator.tell(new Replicator.Get<ORSet<String>>(set2Key,
readMajority), getSelf()); readMajority), getSelf());
final ReadConsistency readAll = new ReadAll(Duration.create(5, SECONDS)); final ReadConsistency readAll = new ReadAll(Duration.ofSeconds(5));
replicator.tell(new Replicator.Get<Flag>(activeFlagKey, replicator.tell(new Replicator.Get<Flag>(activeFlagKey,
readAll), getSelf()); readAll), getSelf());
@ -210,7 +209,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
final ActorRef replicator = final ActorRef replicator =
DistributedData.get(getContext().getSystem()).replicator(); DistributedData.get(getContext().getSystem()).replicator();
final ReadConsistency readTwo = new ReadFrom(2, Duration.create(3, SECONDS)); final ReadConsistency readTwo = new ReadFrom(2, Duration.ofSeconds(3));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1"); final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
@Override @Override
@ -295,7 +294,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
Replicator.writeLocal()), getSelf()); Replicator.writeLocal()), getSelf());
final WriteConsistency writeMajority = final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS)); new WriteMajority(Duration.ofSeconds(5));
replicator.tell(new Delete<PNCounter>(counter1Key, replicator.tell(new Delete<PNCounter>(counter1Key,
writeMajority), getSelf()); writeMajority), getSelf());
}) })

View file

@ -4,13 +4,11 @@
package jdocs.ddata; package jdocs.ddata;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.time.Duration;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
@ -39,9 +37,9 @@ public class ShoppingCart extends AbstractActor {
//#read-write-majority //#read-write-majority
private final WriteConsistency writeMajority = private final WriteConsistency writeMajority =
new WriteMajority(Duration.create(3, SECONDS)); new WriteMajority(Duration.ofSeconds(3));
private final static ReadConsistency readMajority = private final static ReadConsistency readMajority =
new ReadMajority(Duration.create(3, SECONDS)); new ReadMajority(Duration.ofSeconds(3));
//#read-write-majority //#read-write-majority
public static final String GET_CART = "getCart"; public static final String GET_CART = "getCart";

View file

@ -6,7 +6,7 @@ package jdocs.event;
import akka.event.japi.EventBus; import akka.event.japi.EventBus;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
@ -17,7 +17,6 @@ import akka.actor.ActorSystem;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.util.Subclassification; import akka.util.Subclassification;
import scala.concurrent.duration.FiniteDuration;
//#lookup-bus //#lookup-bus
import akka.event.japi.LookupEventBus; import akka.event.japi.LookupEventBus;
@ -319,7 +318,7 @@ public class EventBusDocTest extends AbstractJavaTest {
Notification n2 = new Notification(observer2, 101); Notification n2 = new Notification(observer2, 101);
actorBus.publish(n2); actorBus.publish(n2);
probe2.expectMsgEquals(n2); probe2.expectMsgEquals(n2);
probe1.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); probe1.expectNoMessage(Duration.ofMillis(500));
//#actor-bus-test //#actor-bus-test
} }

View file

@ -10,9 +10,9 @@ import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider; import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem; import akka.actor.ExtendedActorSystem;
import scala.concurrent.duration.Duration;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.time.Duration;
//#imports //#imports
@ -32,8 +32,7 @@ public class SettingsExtensionDocTest extends AbstractJavaTest {
public SettingsImpl(Config config) { public SettingsImpl(Config config) {
DB_URI = config.getString("myapp.db.uri"); DB_URI = config.getString("myapp.db.uri");
CIRCUIT_BREAKER_TIMEOUT = CIRCUIT_BREAKER_TIMEOUT =
Duration.create(config.getDuration("myapp.circuit-breaker.timeout", Duration.ofMillis(config.getDuration("myapp.circuit-breaker.timeout", TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
} }
} }

View file

@ -6,8 +6,7 @@ package jdocs.pattern;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.time.Duration;
import scala.concurrent.duration.Duration;
import akka.actor.ActorKilledException; import akka.actor.ActorKilledException;
import akka.actor.ActorRef; import akka.actor.ActorRef;
@ -62,7 +61,7 @@ public class SupervisedAsk {
@Override @Override
public SupervisorStrategy supervisorStrategy() { public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(0, Duration.Zero(), cause -> { return new OneForOneStrategy(0, Duration.ZERO, cause -> {
caller.tell(new Status.Failure(cause), getSelf()); caller.tell(new Status.Failure(cause), getSelf());
return SupervisorStrategy.stop(); return SupervisorStrategy.stop();
}); });

View file

@ -12,8 +12,6 @@ import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
@ -22,7 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
@ -278,7 +276,7 @@ public class RouterDocTest extends AbstractJavaTest {
//#scatter-gather-pool-1 //#scatter-gather-pool-1
//#scatter-gather-pool-2 //#scatter-gather-pool-2
java.time.Duration within = java.time.Duration.ofSeconds(10); Duration within = Duration.ofSeconds(10);
ActorRef router18 = ActorRef router18 =
getContext().actorOf(new ScatterGatherFirstCompletedPool(5, within).props( getContext().actorOf(new ScatterGatherFirstCompletedPool(5, within).props(
Props.create(Worker.class)), "router18"); Props.create(Worker.class)), "router18");
@ -290,7 +288,7 @@ public class RouterDocTest extends AbstractJavaTest {
//#scatter-gather-group-1 //#scatter-gather-group-1
//#scatter-gather-group-2 //#scatter-gather-group-2
FiniteDuration within2 = FiniteDuration.create(10, TimeUnit.SECONDS); Duration within2 = Duration.ofSeconds(10);
ActorRef router20 = ActorRef router20 =
getContext().actorOf(new ScatterGatherFirstCompletedGroup(paths, within2).props(), getContext().actorOf(new ScatterGatherFirstCompletedGroup(paths, within2).props(),
"router20"); "router20");
@ -303,8 +301,8 @@ public class RouterDocTest extends AbstractJavaTest {
//#tail-chopping-pool-1 //#tail-chopping-pool-1
//#tail-chopping-pool-2 //#tail-chopping-pool-2
FiniteDuration within3 = FiniteDuration.create(10, TimeUnit.SECONDS); Duration within3 = Duration.ofSeconds(10);
FiniteDuration interval = FiniteDuration.create(20, TimeUnit.MILLISECONDS); Duration interval = Duration.ofMillis(20);
ActorRef router22 = ActorRef router22 =
getContext().actorOf(new TailChoppingPool(5, within3, interval).props( getContext().actorOf(new TailChoppingPool(5, within3, interval).props(
Props.create(Worker.class)), "router22"); Props.create(Worker.class)), "router22");
@ -316,8 +314,8 @@ public class RouterDocTest extends AbstractJavaTest {
//#tail-chopping-group-1 //#tail-chopping-group-1
//#tail-chopping-group-2 //#tail-chopping-group-2
FiniteDuration within4 = FiniteDuration.create(10, TimeUnit.SECONDS); Duration within4 = Duration.ofSeconds(10);
FiniteDuration interval2 = FiniteDuration.create(20, TimeUnit.MILLISECONDS); Duration interval2 = Duration.ofMillis(20);
ActorRef router24 = ActorRef router24 =
getContext().actorOf(new TailChoppingGroup(paths, within4, interval2).props(), getContext().actorOf(new TailChoppingGroup(paths, within4, interval2).props(),
"router24"); "router24");
@ -481,7 +479,7 @@ public class RouterDocTest extends AbstractJavaTest {
public void demonstrateSupervisor() { public void demonstrateSupervisor() {
//#supervision //#supervision
final SupervisorStrategy strategy = final SupervisorStrategy strategy =
new OneForOneStrategy(5, Duration.create(1, TimeUnit.MINUTES), new OneForOneStrategy(5, Duration.ofMinutes(1),
Collections.<Class<? extends Throwable>>singletonList(Exception.class)); Collections.<Class<? extends Throwable>>singletonList(Exception.class));
final ActorRef router = system.actorOf(new RoundRobinPool(5). final ActorRef router = system.actorOf(new RoundRobinPool(5).
withSupervisorStrategy(strategy).props(Props.create(Echo.class))); withSupervisorStrategy(strategy).props(Props.create(Echo.class)));

View file

@ -25,10 +25,9 @@ import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -242,7 +241,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest {
assertEquals(String.format("Expected %d, but got %s", i, got.get(i)), WorkerPoolProtocol.done(i), got.get(i)); assertEquals(String.format("Expected %d, but got %s", i, got.get(i)), WorkerPoolProtocol.done(i), got.get(i));
} }
assertEquals(String.format("Expected 117 messages but got %d", i), i, 117); assertEquals(String.format("Expected 117 messages but got %d", i), i, 117);
expectTerminated(Duration.create(10, TimeUnit.SECONDS), worker); expectTerminated(Duration.ofSeconds(10), worker);
} }
}; };
} }

View file

@ -10,23 +10,16 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.pattern.PatternsCS; import akka.pattern.PatternsCS;
import akka.remote.WireFormats;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import static org.junit.Assert.assertEquals;
public class FlowStreamRefsDocTest extends AbstractJavaTest { public class FlowStreamRefsDocTest extends AbstractJavaTest {
static ActorSystem system = null; static ActorSystem system = null;

View file

@ -12,7 +12,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.time.Duration;
import akka.NotUsed; import akka.NotUsed;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
@ -21,14 +20,11 @@ import org.junit.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import akka.actor.*; import akka.actor.*;
import akka.testkit.*;
import akka.japi.Pair; import akka.japi.Pair;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.stream.testkit.*; import akka.stream.testkit.*;
import akka.stream.testkit.javadsl.*; import akka.stream.testkit.javadsl.*;
import akka.testkit.TestProbe;
import scala.concurrent.duration.FiniteDuration;
public class StreamTestKitDocTest extends AbstractJavaTest { public class StreamTestKitDocTest extends AbstractJavaTest {
@ -97,14 +93,12 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
.from(Arrays.asList(1, 2, 3, 4)) .from(Arrays.asList(1, 2, 3, 4))
.grouped(2); .grouped(2);
final TestProbe probe = new TestProbe(system); final TestKit probe = new TestKit(system);
final CompletionStage<List<List<Integer>>> future = sourceUnderTest final CompletionStage<List<List<Integer>>> future = sourceUnderTest
.grouped(2) .grouped(2)
.runWith(Sink.head(), mat); .runWith(Sink.head(), mat);
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref()); akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.getRef());
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
);
//#pipeto-testprobe //#pipeto-testprobe
} }
@ -118,14 +112,13 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
Duration.ofMillis(200), Duration.ofMillis(200),
Tick.TOCK); Tick.TOCK);
final TestProbe probe = new TestProbe(system); final TestKit probe = new TestKit(system);
final Cancellable cancellable = sourceUnderTest final Cancellable cancellable = sourceUnderTest.to(Sink.actorRef(probe.getRef(), Tick.COMPLETED)).run(mat);
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat); probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK);
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK); probe.expectNoMessage(Duration.ofMillis(100));
probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK);
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK);
cancellable.cancel(); cancellable.cancel();
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.COMPLETED); probe.expectMsg(Duration.ofSeconds(3), Tick.COMPLETED);
//#sink-actorref //#sink-actorref
} }

View file

@ -16,13 +16,10 @@ import akka.util.Timeout;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
@ -187,7 +184,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
} }
}; };
final java.time.Duration twoSeconds = dilated(java.time.Duration.ofSeconds(2)); final Duration twoSeconds = dilated(Duration.ofSeconds(2));
final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.probe(system); final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.probe(system);
final TestSubscriber.Probe<String> probe = final TestSubscriber.Probe<String> probe =
@ -210,15 +207,15 @@ public class RecipeGlobalRateLimit extends RecipeTest {
probe.expectSubscription().request(1000); probe.expectSubscription().request(1000);
FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); Duration fiveHundredMillis = Duration.ofMillis(500);
assertTrue(probe.expectNext().startsWith("E")); assertTrue(probe.expectNext().startsWith("E"));
assertTrue(probe.expectNext().startsWith("E")); assertTrue(probe.expectNext().startsWith("E"));
probe.expectNoMsg(fiveHundredMillis); probe.expectNoMessage(fiveHundredMillis);
limiter.tell(Limiter.REPLENISH_TOKENS, getTestActor()); limiter.tell(Limiter.REPLENISH_TOKENS, getTestActor());
assertTrue(probe.expectNext().startsWith("E")); assertTrue(probe.expectNext().startsWith("E"));
probe.expectNoMsg(fiveHundredMillis); probe.expectNoMessage(fiveHundredMillis);
final Set<String> resultSet = new HashSet<>(); final Set<String> resultSet = new HashSet<>();
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {

View file

@ -23,8 +23,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -76,9 +74,10 @@ public class RecipeMissedTicks extends RecipeTest {
pub.sendNext(Tick); pub.sendNext(Tick);
pub.sendNext(Tick); pub.sendNext(Tick);
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS); scala.concurrent.duration.FiniteDuration timeout =
scala.concurrent.duration.FiniteDuration.create(200, TimeUnit.MILLISECONDS);
Await.ready(latch, Duration.create(1, TimeUnit.SECONDS)); Await.ready(latch, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS));
sub.request(1); sub.request(1);
sub.expectNext(3); sub.expectNext(3);

View file

@ -9,6 +9,7 @@ import akka.annotation.InternalApi
import akka.persistence.fsm.PersistentFSM.FSMState import akka.persistence.fsm.PersistentFSM.FSMState
import akka.persistence.serialization.Message import akka.persistence.serialization.Message
import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer } import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer }
import akka.util.JavaDurationConverters
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.annotation.varargs import scala.annotation.varargs
@ -361,6 +362,18 @@ object PersistentFSM {
case _ copy(timeout = PersistentFSM.SomeMaxFiniteDuration) // we need to differentiate "not set" from disabled case _ copy(timeout = PersistentFSM.SomeMaxFiniteDuration) // we need to differentiate "not set" from disabled
} }
/**
* Java API: Modify state transition descriptor to include a state timeout for the
* next state. This timeout overrides any default timeout set for the next
* state.
*
* Use Duration.Inf to deactivate an existing timeout.
*/
def forMax(timeout: java.time.Duration): State[S, D, E] = {
import JavaDurationConverters._
forMax(timeout.asScala)
}
/** /**
* Send reply to sender of the current message, if available. * Send reply to sender of the current message, if available.
* *

View file

@ -17,7 +17,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.time.Duration; import java.time.Duration;
import akka.persistence.fsm.PersistentFSM.CurrentState; import akka.persistence.fsm.PersistentFSM.CurrentState;
@ -511,8 +510,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS)) .forMax(Duration.ofSeconds(1)))
)
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
); );
@ -520,7 +518,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
stay().applying(new ItemAdded(event.getItem())) stay().applying(new ItemAdded(event.getItem()))
.forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS))) .forMax(Duration.ofSeconds(1)))
.event(Buy.class, .event(Buy.class,
//#customer-andthen-example //#customer-andthen-example
(event, data) -> (event, data) ->
@ -544,7 +542,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class, .event(StateTimeout$.class,
(event, data) -> (event, data) ->
goTo(UserState.INACTIVE).forMax(scala.concurrent.duration.Duration.create(2, TimeUnit.SECONDS))) goTo(UserState.INACTIVE).forMax(Duration.ofSeconds(2)))
); );
@ -552,7 +550,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS))) .forMax(Duration.ofSeconds(1)))
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class, .event(StateTimeout$.class,
(event, data) -> (event, data) ->

View file

@ -18,6 +18,7 @@ import java.io.PrintWriter
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import akka.testkit.TestActor.AutoPilot import akka.testkit.TestActor.AutoPilot
import akka.util.JavaDurationConverters
/** /**
* Provides factory methods for various Publishers. * Provides factory methods for various Publishers.
@ -599,6 +600,15 @@ object TestSubscriber {
self self
} }
/**
* Java API: Assert that no message is received for the specified time.
*/
def expectNoMessage(remaining: java.time.Duration): Self = {
import JavaDurationConverters._
probe.expectNoMessage(remaining.asScala)
self
}
/** /**
* Expect a stream element and test it with partial function. * Expect a stream element and test it with partial function.
* *

View file

@ -66,7 +66,7 @@ public class FlowTest extends StreamTest {
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5); final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input); final Source<Integer, NotUsed> ints = Source.from(input);
final Flow<Integer, String, NotUsed> flow1 = Flow.of(Integer.class).drop(2).take(3 final Flow<Integer, String, NotUsed> flow1 = Flow.of(Integer.class).drop(2).take(3
).takeWithin(java.time.Duration.ofSeconds(10 ).takeWithin(Duration.ofSeconds(10
)).map(new Function<Integer, String>() { )).map(new Function<Integer, String>() {
public String apply(Integer elem) { public String apply(Integer elem) {
return lookup[elem]; return lookup[elem];
@ -81,7 +81,7 @@ public class FlowTest extends StreamTest {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;
} }
}).groupedWithin(100, java.time.Duration.ofMillis(50) }).groupedWithin(100, Duration.ofMillis(50)
).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() { ).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;
@ -189,10 +189,7 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
probe.expectNoMessage(Duration.ofMillis(200));
Duration duration = Duration.ofMillis(200);
probe.expectNoMessage(duration);
future.toCompletableFuture().get(3, TimeUnit.SECONDS); future.toCompletableFuture().get(3, TimeUnit.SECONDS);
} }

View file

@ -14,7 +14,6 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import akka.Done;
import akka.NotUsed; import akka.NotUsed;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.Function; import akka.japi.function.Function;
@ -23,10 +22,6 @@ import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.japi.function.Function2;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;

View file

@ -12,12 +12,8 @@ import akka.stream.javadsl.Source;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;