From 0f4abb687f3d94f8782aa277456e0eae6d62d5de Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 12 Nov 2014 10:16:22 +0100 Subject: [PATCH] =str #16272 splitWhen should drop reference when done with substream The completeSubstreamOutput is used to not early complete the stream, where as invalidating would shutdown the stream too early (and elements wouldn't be emitted as expected). --- .../src/test/java/akka/stream/javadsl/FlowTest.java | 6 +++--- .../akka/stream/impl/SplitWhenProcessorImpl.scala | 6 +++--- .../akka/stream/impl/StreamOfStreamProcessors.scala | 11 +++++++++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index dd01fcee0a..2ed90f4041 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -257,10 +257,10 @@ public class FlowTest { @Test public void mustBeAbleToUseSplitWhen() { final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F"); + final Iterable input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F"); Source.from(input).splitWhen(new Predicate() { public boolean test(String elem) { - return elem.equals("\n"); + return elem.equals("."); } }).foreach(new Procedure>() { @Override @@ -268,7 +268,7 @@ public class FlowTest { subStream.filter(new Predicate() { @Override public boolean test(String elem) { - return !elem.equals("\n"); + return !elem.equals("."); } }).grouped(10).foreach(new Procedure>() { @Override diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index e983a3b494..5f901eb8a5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -38,7 +38,7 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() if (splitPredicate(elem)) { - currentSubstream.complete() + completeSubstreamOutput(currentSubstream.key) currentSubstream = null nextPhase(openSubstream(elem)) } else substream.enqueueOutputElement(elem) @@ -52,9 +52,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val nextPhase(waitFirst) - override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + override def completeSubstreamOutput(substream: SubstreamKey): Unit = { if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.invalidateSubstreamOutput(substream) + super.completeSubstreamOutput(substream) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 2635dc807b..4b919623a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -124,9 +124,13 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + completeSubstreamOutput(substream) + pump() + } + + protected def completeSubstreamOutput(substream: SubstreamKey): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream - pump() } protected def failOutputs(e: Throwable): Unit = { @@ -138,7 +142,10 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } val outputSubstreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) + case SubstreamRequestMore(key, demand) ⇒ substreamOutputs.get(key) match { + case Some(substream) ⇒ substream.enqueueOutputDemand(demand) + case _ ⇒ // ignore... + } case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) }