diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index dd01fcee0a..2ed90f4041 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -257,10 +257,10 @@ public class FlowTest { @Test public void mustBeAbleToUseSplitWhen() { final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F"); + final Iterable input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F"); Source.from(input).splitWhen(new Predicate() { public boolean test(String elem) { - return elem.equals("\n"); + return elem.equals("."); } }).foreach(new Procedure>() { @Override @@ -268,7 +268,7 @@ public class FlowTest { subStream.filter(new Predicate() { @Override public boolean test(String elem) { - return !elem.equals("\n"); + return !elem.equals("."); } }).grouped(10).foreach(new Procedure>() { @Override diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index e983a3b494..5f901eb8a5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -38,7 +38,7 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() if (splitPredicate(elem)) { - currentSubstream.complete() + completeSubstreamOutput(currentSubstream.key) currentSubstream = null nextPhase(openSubstream(elem)) } else substream.enqueueOutputElement(elem) @@ -52,9 +52,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val nextPhase(waitFirst) - override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + override def completeSubstreamOutput(substream: SubstreamKey): Unit = { if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) - super.invalidateSubstreamOutput(substream) + super.completeSubstreamOutput(substream) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 2635dc807b..4b919623a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -124,9 +124,13 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { + completeSubstreamOutput(substream) + pump() + } + + protected def completeSubstreamOutput(substream: SubstreamKey): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream - pump() } protected def failOutputs(e: Throwable): Unit = { @@ -138,7 +142,10 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } val outputSubstreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) + case SubstreamRequestMore(key, demand) ⇒ substreamOutputs.get(key) match { + case Some(substream) ⇒ substream.enqueueOutputDemand(demand) + case _ ⇒ // ignore... + } case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) }