+act add java.time.Duration support Actor javadsl (#24993)
* add java support in akka-actor module in the class Abstract.ActorContext by adding the setReceiveTimeout and CancelReceiveTimeout * add akka.actor.AbstractActor#ActorContext cancelReceiveTimeout and setReceiveTimeout to mima-excludes file * removed scala.concurrent.duration dependency in ActorDocTest and add new method expectTerminated with java.time.Duration support in TestKit * used java.time.Duration as default import
This commit is contained in:
parent
55fb092bb2
commit
759010f0cd
9 changed files with 72 additions and 33 deletions
|
|
@ -18,8 +18,8 @@ import akka.Done;
|
|||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.time.Duration;
|
||||
import akka.testkit.TestActors;
|
||||
import scala.concurrent.Await;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
|
@ -47,7 +47,6 @@ import java.util.concurrent.CompletableFuture;
|
|||
//#import-gracefulStop
|
||||
import static akka.pattern.PatternsCS.gracefulStop;
|
||||
import akka.pattern.AskTimeoutException;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
//#import-gracefulStop
|
||||
|
|
@ -73,8 +72,8 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
Await.ready(system.terminate(), Duration.create(5, TimeUnit.SECONDS));
|
||||
public static void afterClass() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
}
|
||||
|
||||
static
|
||||
|
|
@ -412,7 +411,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
//#gracefulStop
|
||||
try {
|
||||
CompletionStage<Boolean> stopped =
|
||||
gracefulStop(actorRef, java.time.Duration.ofSeconds(5), Manager.SHUTDOWN);
|
||||
gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);
|
||||
stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
|
||||
// the actor has been stopped
|
||||
} catch (AskTimeoutException e) {
|
||||
|
|
@ -535,7 +534,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
//#receive-timeout
|
||||
public ReceiveTimeoutActor() {
|
||||
// To set an initial delay
|
||||
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(10));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -543,7 +542,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
return receiveBuilder()
|
||||
.matchEquals("Hello", s -> {
|
||||
// To set in a response to a message
|
||||
getContext().setReceiveTimeout(Duration.create(1, TimeUnit.SECONDS));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(1));
|
||||
//#receive-timeout
|
||||
target = getSender();
|
||||
target.tell("Hello world", getSelf());
|
||||
|
|
@ -551,7 +550,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
})
|
||||
.match(ReceiveTimeout.class, r -> {
|
||||
// To turn it off
|
||||
getContext().setReceiveTimeout(Duration.Undefined());
|
||||
getContext().cancelReceiveTimeout();
|
||||
//#receive-timeout
|
||||
target.tell("timeout", getSelf());
|
||||
//#receive-timeout
|
||||
|
|
@ -628,7 +627,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
actor.tell("foo", getRef());
|
||||
actor.tell("foo", getRef());
|
||||
expectMsgEquals("I am already angry?");
|
||||
expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
|
||||
expectNoMessage(Duration.ofSeconds(1));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -741,7 +740,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
{
|
||||
watch(b);
|
||||
system.stop(a);
|
||||
assertEquals(expectMsgClass(java.time.Duration.ofSeconds(2), Terminated.class).actor(), b);
|
||||
assertEquals(expectMsgClass(Duration.ofSeconds(2), Terminated.class).actor(), b);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -755,7 +754,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
ActorRef actorC = getRef();
|
||||
|
||||
//#ask-pipe
|
||||
Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
|
||||
Timeout t = Timeout.create(Duration.ofSeconds(5));
|
||||
|
||||
// using 1000ms timeout
|
||||
CompletableFuture<Object> future1 =
|
||||
|
|
@ -791,7 +790,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());
|
||||
|
||||
// expecting the actor to indeed terminate:
|
||||
expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim);
|
||||
expectTerminated(Duration.ofSeconds(3), victim);
|
||||
//#kill
|
||||
}
|
||||
};
|
||||
|
|
@ -806,7 +805,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
//#poison-pill
|
||||
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
|
||||
//#poison-pill
|
||||
expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim);
|
||||
expectTerminated(Duration.ofSeconds(3), victim);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.time.Duration;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.dispatch.Mapper;
|
||||
|
|
@ -19,7 +20,6 @@ import akka.pattern.Patterns;
|
|||
import akka.util.Timeout;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import static akka.japi.Util.classTag;
|
||||
import static akka.actor.SupervisorStrategy.restart;
|
||||
|
|
@ -67,7 +67,7 @@ public class FaultHandlingDocSample {
|
|||
public void preStart() {
|
||||
// If we don't get any progress within 15 seconds then the service
|
||||
// is unavailable
|
||||
getContext().setReceiveTimeout(Duration.create("15 seconds"));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(15));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -114,7 +114,7 @@ public class FaultHandlingDocSample {
|
|||
* The Worker supervise the CounterService.
|
||||
*/
|
||||
public static class Worker extends AbstractLoggingActor {
|
||||
final Timeout askTimeout = new Timeout(Duration.create(5, "seconds"));
|
||||
final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5));
|
||||
|
||||
// The sender of the initial Start message will continuously be notified
|
||||
// about progress
|
||||
|
|
@ -140,7 +140,7 @@ public class FaultHandlingDocSample {
|
|||
matchEquals(Start, x -> progressListener == null, x -> {
|
||||
progressListener = getSender();
|
||||
getContext().getSystem().scheduler().schedule(
|
||||
java.time.Duration.ZERO, java.time.Duration.ofSeconds(1L), getSelf(), Do,
|
||||
Duration.ZERO, Duration.ofSeconds(1L), getSelf(), Do,
|
||||
getContext().dispatcher(), null
|
||||
);
|
||||
}).
|
||||
|
|
@ -232,7 +232,7 @@ public class FaultHandlingDocSample {
|
|||
// Restart the storage child when StorageException is thrown.
|
||||
// After 3 restarts within 5 seconds it will be stopped.
|
||||
private static final SupervisorStrategy strategy =
|
||||
new OneForOneStrategy(3, Duration.create("5 seconds"), DeciderBuilder.
|
||||
new OneForOneStrategy(3, scala.concurrent.duration.Duration.create("5 seconds"), DeciderBuilder.
|
||||
match(StorageException.class, e -> restart()).
|
||||
matchAny(o -> escalate()).build());
|
||||
|
||||
|
|
@ -292,7 +292,7 @@ public class FaultHandlingDocSample {
|
|||
counter.tell(new UseStorage(null), getSelf());
|
||||
// Try to re-establish storage after while
|
||||
getContext().getSystem().scheduler().scheduleOnce(
|
||||
Duration.create(10, "seconds"), getSelf(), Reconnect,
|
||||
Duration.ofSeconds(10), getSelf(), Reconnect,
|
||||
getContext().dispatcher(), null);
|
||||
}).
|
||||
matchEquals(Reconnect, o -> {
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.time.Duration;
|
||||
|
||||
import akka.actor.Props;
|
||||
import akka.cluster.metrics.AdaptiveLoadBalancingGroup;
|
||||
|
|
@ -19,7 +19,6 @@ import akka.cluster.routing.ClusterRouterGroup;
|
|||
import akka.cluster.routing.ClusterRouterGroupSettings;
|
||||
import akka.cluster.routing.ClusterRouterPool;
|
||||
import akka.cluster.routing.ClusterRouterPoolSettings;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.actor.AbstractActor;
|
||||
|
|
@ -45,7 +44,7 @@ public class FactorialFrontend extends AbstractActor {
|
|||
@Override
|
||||
public void preStart() {
|
||||
sendJobs();
|
||||
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(10));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -6,11 +6,10 @@ package jdocs.cluster;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.time.Duration;
|
||||
|
||||
import jdocs.cluster.StatsMessages.JobFailed;
|
||||
import jdocs.cluster.StatsMessages.StatsResult;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.actor.AbstractActor;
|
||||
|
|
@ -29,7 +28,7 @@ public class StatsAggregator extends AbstractActor {
|
|||
|
||||
@Override
|
||||
public void preStart() {
|
||||
getContext().setReceiveTimeout(Duration.create(3, TimeUnit.SECONDS));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(3));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -4,10 +4,9 @@
|
|||
|
||||
package jdocs.sharding;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.util.Optional;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import java.time.Duration;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorInitializationException;
|
||||
|
|
@ -17,7 +16,6 @@ import akka.actor.OneForOneStrategy;
|
|||
import akka.actor.PoisonPill;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import akka.actor.Terminated;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
//#counter-extractor
|
||||
import akka.cluster.sharding.ShardRegion;
|
||||
|
|
@ -31,7 +29,6 @@ import akka.cluster.sharding.ClusterShardingSettings;
|
|||
|
||||
//#counter-start
|
||||
import akka.persistence.AbstractPersistentActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.japi.pf.DeciderBuilder;
|
||||
|
||||
// Doc code, compile only
|
||||
|
|
@ -200,7 +197,7 @@ public class ClusterShardingTest {
|
|||
@Override
|
||||
public void preStart() throws Exception {
|
||||
super.preStart();
|
||||
getContext().setReceiveTimeout(Duration.create(120, SECONDS));
|
||||
getContext().setReceiveTimeout(Duration.ofSeconds(120));
|
||||
}
|
||||
|
||||
void updateState(CounterChanged event) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue