StreamRefs cancellation behavior #27639

* Updated multi-jvm test
* Updated migration guide
This commit is contained in:
Johan Andrén 2019-09-06 08:55:51 +02:00
parent ed955e0da4
commit 66280846c1
2 changed files with 16 additions and 13 deletions

View file

@ -5,11 +5,7 @@
package akka.cluster
import akka.Done
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, Identify, Props }
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
@ -52,7 +48,7 @@ object StreamRefSpec extends MultiNodeConfig {
Props(new DataSource(streamLifecycleProbe))
}
class DataSource(streamLifecycleProbe: ActorRef) extends Actor {
class DataSource(streamLifecycleProbe: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
implicit val mat = Materializer(context)
@ -72,8 +68,11 @@ object StreamRefSpec extends MultiNodeConfig {
.run()
done.onComplete {
case Success(_) => streamLifecycleProbe ! s"completed-$streamId"
case Failure(_) => streamLifecycleProbe ! s"failed-$streamId"
case Success(_) =>
streamLifecycleProbe ! s"completed-$streamId"
case Failure(ex) =>
log.info("Source stream completed with failure: {}", ex)
streamLifecycleProbe ! s"failed-$streamId"
}
// wrap the SourceRef in some domain message, such that the sender knows what source it is
@ -94,7 +93,7 @@ object StreamRefSpec extends MultiNodeConfig {
Props(new DataReceiver(streamLifecycleProbe))
}
class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor {
class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
implicit val mat = Materializer(context)
@ -115,7 +114,9 @@ object StreamRefSpec extends MultiNodeConfig {
done.onComplete {
case Success(_) => streamLifecycleProbe ! s"completed-$nodeId"
case Failure(_) => streamLifecycleProbe ! s"failed-$nodeId"
case Failure(ex) =>
log.info("Sink stream completed with failure: {}", ex)
streamLifecycleProbe ! s"failed-$nodeId"
}
// wrap the SinkRef in some domain message, such that the sender knows what source it is
@ -186,8 +187,8 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode
destinationForSource.expectError().getClass should ===(classOf[RemoteStreamRefActorTerminatedException])
}
runOn(second) {
// it will be cancelled, i.e. competed
dataSourceLifecycle.expectMsg("completed-1337")
// it will be cancelled with a failure
dataSourceLifecycle.expectMsg("failed-1337")
}
enterBarrier("after-2")
@ -239,7 +240,8 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode
enterBarrier("members-removed")
runOn(first) {
streamLifecycle1.expectMsg("completed-system-42-tmp")
// failure propagated upstream
streamLifecycle1.expectMsg("failed-system-42-tmp")
}
runOn(third) {
streamLifecycle3.expectMsg("failed-system-42-tmp")