Merge pull request #26202 from akka/wip-25824-stashing-pipeToSelf-patriknw
stashing and pipeToSelf, #25824
This commit is contained in:
commit
c8e6500f7c
2 changed files with 42 additions and 52 deletions
|
|
@ -12,8 +12,9 @@ import akka.Done;
|
|||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.testkit.typed.javadsl.TestInbox;
|
||||
import akka.actor.testkit.typed.javadsl.BehaviorTestKit;
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
|
|
@ -90,12 +91,11 @@ public class StashDocTest extends JUnitSuite {
|
|||
|
||||
Behavior<Command> behavior() {
|
||||
return Behaviors.setup(context -> {
|
||||
db.load(id)
|
||||
.whenComplete((value, cause) -> {
|
||||
context.pipeToSelf(db.load(id), (value, cause) -> {
|
||||
if (cause == null)
|
||||
context.getSelf().tell(new InitialState(value));
|
||||
return new InitialState(value);
|
||||
else
|
||||
context.getSelf().tell(new DBError(asRuntimeException(cause)));
|
||||
return new DBError(asRuntimeException(cause));
|
||||
});
|
||||
|
||||
return init();
|
||||
|
|
@ -126,12 +126,11 @@ public class StashDocTest extends JUnitSuite {
|
|||
return Behaviors.same();
|
||||
})
|
||||
.onMessage(Save.class, (context, message) -> {
|
||||
db.save(id, message.payload)
|
||||
.whenComplete((value, cause) -> {
|
||||
context.pipeToSelf(db.save(id, message.payload), (value, cause) -> {
|
||||
if (cause == null)
|
||||
context.getSelf().tell(SaveSuccess.instance);
|
||||
return SaveSuccess.instance;
|
||||
else
|
||||
context.getSelf().tell(new DBError(asRuntimeException(cause)));
|
||||
return new DBError(asRuntimeException(cause));
|
||||
});
|
||||
return saving(message.payload, message.replyTo);
|
||||
})
|
||||
|
|
@ -168,6 +167,10 @@ public class StashDocTest extends JUnitSuite {
|
|||
|
||||
//#stashing
|
||||
|
||||
@ClassRule
|
||||
public static final TestKitJunitResource testKit = new TestKitJunitResource();
|
||||
|
||||
|
||||
@Test
|
||||
public void stashingExample() throws Exception {
|
||||
final DB db = new DB() {
|
||||
|
|
@ -178,23 +181,19 @@ public class StashDocTest extends JUnitSuite {
|
|||
return CompletableFuture.completedFuture("TheValue");
|
||||
}
|
||||
};
|
||||
final DataAccess dataAccess = new DataAccess("17", db);
|
||||
BehaviorTestKit<DataAccess.Command> testKit = BehaviorTestKit.create(dataAccess.behavior());
|
||||
TestInbox<String> getInbox = TestInbox.create("getInbox");
|
||||
testKit.run(new DataAccess.Get(getInbox.getRef()));
|
||||
DataAccess.Command initialStateMsg = testKit.selfInbox().receiveMessage();
|
||||
testKit.run(initialStateMsg);
|
||||
|
||||
final ActorRef<DataAccess.Command> dataAccess = testKit.spawn(new DataAccess("17", db).behavior());
|
||||
TestProbe<String> getInbox = testKit.createTestProbe(String.class);
|
||||
dataAccess.tell(new DataAccess.Get(getInbox.getRef()));
|
||||
getInbox.expectMessage("TheValue");
|
||||
|
||||
TestInbox<Done> saveInbox = TestInbox.create("saveInbox");
|
||||
testKit.run(new DataAccess.Save("UpdatedValue", saveInbox.getRef()));
|
||||
testKit.run(new DataAccess.Get(getInbox.getRef()));
|
||||
DataAccess.Command saveSuccessMsg = testKit.selfInbox().receiveMessage();
|
||||
testKit.run(saveSuccessMsg);
|
||||
TestProbe<Done> saveInbox = testKit.createTestProbe(Done.class);
|
||||
dataAccess.tell(new DataAccess.Save("UpdatedValue", saveInbox.getRef()));
|
||||
dataAccess.tell(new DataAccess.Get(getInbox.getRef()));
|
||||
saveInbox.expectMessage(Done.getInstance());
|
||||
getInbox.expectMessage("UpdatedValue");
|
||||
|
||||
testKit.run(new DataAccess.Get(getInbox.getRef()));
|
||||
dataAccess.tell(new DataAccess.Get(getInbox.getRef()));
|
||||
getInbox.expectMessage("UpdatedValue");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,13 +7,13 @@ package docs.akka.typed
|
|||
import scala.concurrent.Future
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.testkit.typed.scaladsl.{ BehaviorTestKit, TestInbox }
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object StashDocSpec {
|
||||
// #stashing
|
||||
|
|
@ -59,10 +59,9 @@ object StashDocSpec {
|
|||
replyTo ! state
|
||||
Behaviors.same
|
||||
case Save(value, replyTo) ⇒
|
||||
import context.executionContext
|
||||
db.save(id, value).onComplete {
|
||||
case Success(_) ⇒ context.self ! SaveSuccess
|
||||
case Failure(cause) ⇒ context.self ! DBError(cause)
|
||||
context.pipeToSelf(db.save(id, value)) {
|
||||
case Success(_) ⇒ SaveSuccess
|
||||
case Failure(cause) ⇒ DBError(cause)
|
||||
}
|
||||
saving(value, replyTo)
|
||||
}
|
||||
|
|
@ -82,12 +81,9 @@ object StashDocSpec {
|
|||
}
|
||||
}
|
||||
|
||||
import context.executionContext
|
||||
db.load(id).onComplete {
|
||||
case Success(value) ⇒
|
||||
context.self ! InitialState(value)
|
||||
case Failure(cause) ⇒
|
||||
context.self ! DBError(cause)
|
||||
context.pipeToSelf(db.load(id)) {
|
||||
case Success(value) ⇒ InitialState(value)
|
||||
case Failure(cause) ⇒ DBError(cause)
|
||||
}
|
||||
|
||||
init()
|
||||
|
|
@ -96,7 +92,7 @@ object StashDocSpec {
|
|||
// #stashing
|
||||
}
|
||||
|
||||
class StashDocSpec extends WordSpec with Matchers {
|
||||
class StashDocSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
import StashDocSpec.DB
|
||||
import StashDocSpec.DataAccess
|
||||
|
||||
|
|
@ -108,24 +104,19 @@ class StashDocSpec extends WordSpec with Matchers {
|
|||
override def save(id: String, value: String): Future[Done] = Future.successful(Done)
|
||||
override def load(id: String): Future[String] = Future.successful("TheValue")
|
||||
}
|
||||
val testKit = BehaviorTestKit(DataAccess.behavior(id = "17", db))
|
||||
val getInbox = TestInbox[String]()
|
||||
testKit.run(DataAccess.Get(getInbox.ref))
|
||||
val initialStateMsg = testKit.selfInbox().receiveMessage()
|
||||
testKit.run(initialStateMsg)
|
||||
getInbox.expectMessage("TheValue")
|
||||
val dataAccess = spawn(DataAccess.behavior(id = "17", db))
|
||||
val getProbe = createTestProbe[String]()
|
||||
dataAccess ! DataAccess.Get(getProbe.ref)
|
||||
getProbe.expectMessage("TheValue")
|
||||
|
||||
val saveInbox = TestInbox[Done]()
|
||||
testKit.run(DataAccess.Save("UpdatedValue", saveInbox.ref))
|
||||
testKit.run(DataAccess.Get(getInbox.ref))
|
||||
val saveSuccessMsg = testKit.selfInbox().receiveMessage()
|
||||
testKit.run(saveSuccessMsg)
|
||||
saveInbox.expectMessage(Done)
|
||||
getInbox.expectMessage("UpdatedValue")
|
||||
|
||||
testKit.run(DataAccess.Get(getInbox.ref))
|
||||
getInbox.expectMessage("UpdatedValue")
|
||||
val saveProbe = createTestProbe[Done]()
|
||||
dataAccess ! DataAccess.Save("UpdatedValue", saveProbe.ref)
|
||||
dataAccess ! DataAccess.Get(getProbe.ref)
|
||||
saveProbe.expectMessage(Done)
|
||||
getProbe.expectMessage("UpdatedValue")
|
||||
|
||||
dataAccess ! DataAccess.Get(getProbe.ref)
|
||||
getProbe.expectMessage("UpdatedValue")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue