From 6ef6ae1446031c5838c66b6dbdd4240b1ac2bb8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Sowi=C5=84ski?= Date: Tue, 15 Jul 2025 06:21:24 +0200 Subject: [PATCH] Allow overriding dispatcher in mapWithResource (#1949) * Allow overriding the dispatcher in mapWithResource Closes #1948 --- .../scaladsl/FlowMapWithResourceSpec.scala | 27 ++++++++++++++++++- .../apache/pekko/stream/scaladsl/Flow.scala | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala index 56db0ed625..894e372a8a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala @@ -33,7 +33,7 @@ import com.google.common.jimfs.{ Configuration, Jimfs } import org.apache.pekko import pekko.Done -import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer } +import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, Attributes, SystemMaterializer } import pekko.stream.ActorAttributes.supervisionStrategy import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider } import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } @@ -231,6 +231,31 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { finally p.cancel() } + "allow overriding the default dispatcher" in { + val p = Source + .single(1) + .mapWithResource(() => newBufferedReader())( + (reader, _) => Option(reader.readLine()), + reader => { + reader.close() + None + }) + .withAttributes( + Attributes.name("mapWithResourceCustomDispatcher") and + ActorAttributes.dispatcher("pekko.actor.default-dispatcher") + ) + .runWith(TestSink.probe) + + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children + .find(_.path.toString contains "mapWithResourceCustomDispatcher").get + try assertDispatcher(ref, "pekko.actor.default-dispatcher") + finally p.cancel() + } + "fail when create throws exception" in { EventFilter[TE](occurrences = 1).intercept { val p = Source diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 451b43ca0d..e178a5c0b1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1190,7 +1190,7 @@ trait FlowOps[+Out, +Mat] { create, (resource, out) => (resource, f(resource, out)), resource => close(resource)) - .withAttributes(DefaultAttributes.mapWithResource)) + ).withAttributes(DefaultAttributes.mapWithResource) /** * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails.