diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala index f8c7410c22..1394ca1b97 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -5,11 +5,7 @@ package akka.cluster import akka.Done -import akka.actor.Actor -import akka.actor.ActorIdentity -import akka.actor.ActorRef -import akka.actor.Identify -import akka.actor.Props +import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, Identify, Props } import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction @@ -52,7 +48,7 @@ object StreamRefSpec extends MultiNodeConfig { Props(new DataSource(streamLifecycleProbe)) } - class DataSource(streamLifecycleProbe: ActorRef) extends Actor { + class DataSource(streamLifecycleProbe: ActorRef) extends Actor with ActorLogging { import context.dispatcher implicit val mat = Materializer(context) @@ -72,8 +68,11 @@ object StreamRefSpec extends MultiNodeConfig { .run() done.onComplete { - case Success(_) => streamLifecycleProbe ! s"completed-$streamId" - case Failure(_) => streamLifecycleProbe ! s"failed-$streamId" + case Success(_) => + streamLifecycleProbe ! s"completed-$streamId" + case Failure(ex) => + log.info("Source stream completed with failure: {}", ex) + streamLifecycleProbe ! s"failed-$streamId" } // wrap the SourceRef in some domain message, such that the sender knows what source it is @@ -94,7 +93,7 @@ object StreamRefSpec extends MultiNodeConfig { Props(new DataReceiver(streamLifecycleProbe)) } - class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor { + class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor with ActorLogging { import context.dispatcher implicit val mat = Materializer(context) @@ -115,7 +114,9 @@ object StreamRefSpec extends MultiNodeConfig { done.onComplete { case Success(_) => streamLifecycleProbe ! s"completed-$nodeId" - case Failure(_) => streamLifecycleProbe ! s"failed-$nodeId" + case Failure(ex) => + log.info("Sink stream completed with failure: {}", ex) + streamLifecycleProbe ! s"failed-$nodeId" } // wrap the SinkRef in some domain message, such that the sender knows what source it is @@ -186,8 +187,8 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode destinationForSource.expectError().getClass should ===(classOf[RemoteStreamRefActorTerminatedException]) } runOn(second) { - // it will be cancelled, i.e. competed - dataSourceLifecycle.expectMsg("completed-1337") + // it will be cancelled with a failure + dataSourceLifecycle.expectMsg("failed-1337") } enterBarrier("after-2") @@ -239,7 +240,8 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode enterBarrier("members-removed") runOn(first) { - streamLifecycle1.expectMsg("completed-system-42-tmp") + // failure propagated upstream + streamLifecycle1.expectMsg("failed-system-42-tmp") } runOn(third) { streamLifecycle3.expectMsg("failed-system-42-tmp") diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 437a96c52a..fea8fc2ae6 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -550,6 +550,7 @@ The following operators have a slight change in behavior because of this: * `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with an `IOOperationIncompleteException` when downstream fails * `.watchTermination` will fail the materialized `Future` or `CompletionStage` rather than completing it when downstream fails +* `StreamRef` - `SourceRef` will cancel with a failure when the receiving node is downed This also means that custom `GraphStage` implementations should be changed to pass on the cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature