From 42e9718c7ab7d8dcc3c8330fbb6fd3fb2a61aa68 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 8 Sep 2014 13:58:52 +0200 Subject: [PATCH] !str #15742 Hook-up groupBy to new DSL * Produces Flows instead of Publisher --- .../impl2/ActorBasedFlowMaterializer.scala | 8 +- .../stream/impl2/GroupByProcessorImpl.scala | 71 +++++++ .../scala/akka/stream/scaladsl2/Flow.scala | 14 ++ .../stream/scaladsl2/FlowGroupBySpec.scala | 196 ++++++++++++++++++ 4 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala create mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 5264fc1ca5..d2b25adb50 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -4,13 +4,10 @@ package akka.stream.impl2 import java.util.concurrent.atomic.AtomicLong - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Await - import org.reactivestreams.{ Processor, Publisher, Subscriber } - import akka.actor._ import akka.pattern.ask import akka.stream.{ MaterializerSettings, Transformer } @@ -27,6 +24,10 @@ private[akka] object Ast { case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + case class GroupBy(f: Any ⇒ Any) extends AstNode { + override def name = "groupBy" + } + } /** @@ -182,6 +183,7 @@ private[akka] object ActorProcessorFactory { def props(settings: MaterializerSettings, op: AstNode): Props = (op match { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) }).withDispatcher(settings.dispatcher) def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { diff --git a/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala new file mode 100644 index 0000000000..a6babacbcd --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl2/GroupByProcessorImpl.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import akka.stream.MaterializerSettings +import akka.stream.impl.TransferPhase +import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey +import akka.stream.scaladsl2.FlowFrom +import akka.stream.impl.MultiStreamOutputProcessor + +/** + * INTERNAL API + */ +private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { + var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] + + var pendingSubstreamOutputs: SubstreamOutputs = _ + + // No substream is open yet. If downstream cancels now, we are complete + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + nextPhase(openSubstream(elem, key)) + } + + // some substreams are open now. If downstream cancels, we still continue until the substreams are closed + val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + + keyToSubstreamOutputs.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay + } + } + + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.isClosed) { + // Just drop, we do not open any more substreams + nextPhase(waitNext) + } else { + val substreamOutput = newSubstream() + val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher + primaryOutputs.enqueueOutputElement((key, substreamFlow)) + keyToSubstreamOutputs(key) = substreamOutput + nextPhase(dispatchToSubstream(elem, substreamOutput)) + } + } + + def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { + pendingSubstreamOutputs = substream + TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + pendingSubstreamOutputs = null + nextPhase(waitNext) + } + } + + nextPhase(waitFirst) + + override def invalidateSubstream(substream: SubstreamKey): Unit = { + if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) { + pendingSubstreamOutputs = null + nextPhase(waitNext) + } + super.invalidateSubstream(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index a3d991025f..7a48e38edf 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -44,6 +44,20 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] { def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) } + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * it is emitted to the downstream subscriber together with a fresh + * flow that will eventually produce all the elements of the substream + * for that key. Not consuming the elements from the created streams will + * stop this processor from processing more elements, therefore you must take + * care to unblock (or cancel) all of the produced streams even if you want + * to consume only one of them. + */ + def groupBy[K, U >: Out](f: Out ⇒ K): Repr[In, (K, FlowWithSource[U, U])] = + andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) } /** diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala new file mode 100644 index 0000000000..ab00a75ac3 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupBySpec.scala @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit._ +import org.reactivestreams.Publisher +import scala.util.control.NoStackTrace +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowGroupBySpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) + + case class StreamPuppet(p: Publisher[Int]) { + val probe = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(probe) + val subscription = probe.expectSubscription() + + def request(demand: Int): Unit = subscription.request(demand) + def expectNext(elem: Int): Unit = probe.expectNext(elem) + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + def expectComplete(): Unit = probe.expectComplete() + def expectError(e: Throwable) = probe.expectError(e) + def cancel(): Unit = subscription.cancel() + } + + class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { + val source = FlowFrom((1 to elementCount).iterator).toPublisher() + val groupStream = FlowFrom(source).groupBy(_ % groupCount).toPublisher() + val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]() + + groupStream.subscribe(masterSubscriber) + val masterSubscription = masterSubscriber.expectSubscription() + + def getSubFlow(expectedKey: Int): FlowWithSource[Int, Int] = { + masterSubscription.request(1) + expectSubFlow(expectedKey: Int) + } + + def expectSubFlow(expectedKey: Int): FlowWithSource[Int, Int] = { + val (key, substream) = masterSubscriber.expectNext() + key should be(expectedKey) + substream + } + + } + + case class TE(message: String) extends RuntimeException(message) with NoStackTrace + + "groupBy" must { + "work in the happy case" in new SubstreamsSupport(groupCount = 2) { + val s1 = StreamPuppet(getSubFlow(1).toPublisher()) + masterSubscriber.expectNoMsg(100.millis) + + s1.expectNoMsg(100.millis) + s1.request(1) + s1.expectNext(1) + s1.expectNoMsg(100.millis) + + val s2 = StreamPuppet(getSubFlow(0).toPublisher()) + + s2.expectNoMsg(100.millis) + s2.request(2) + s2.expectNext(2) + + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) + s2.expectNext(4) + + s2.expectNoMsg(100.millis) + + s1.expectNext(3) + + s2.request(1) + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) + s2.expectNext(6) + s2.expectComplete() + + s1.expectNext(5) + s1.expectComplete() + + masterSubscriber.expectComplete() + + } + + "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { + StreamPuppet(getSubFlow(1).toPublisher()).cancel() + + val substream = StreamPuppet(getSubFlow(0).toPublisher()) + substream.request(2) + substream.expectNext(2) + substream.expectNext(4) + substream.expectNoMsg(100.millis) + + substream.request(2) + substream.expectNext(6) + substream.expectComplete() + + masterSubscriber.expectComplete() + + } + + "accept cancellation of master stream when not consumed anything" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.cancel() + upstreamSubscription.expectCancellation() + } + + "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { + pending + // FIXME: Needs handling of loose substreams that no one refers to anymore. + // val substream = StreamPuppet(getSubproducer(1)) + // + // substream.request(1) + // substream.expectNext(1) + // + // masterSubscription.cancel() + // masterSubscriber.expectNoMsg(100.millis) + // + // // Open substreams still work, others are discarded + // substream.request(4) + // substream.expectNext(4) + // substream.expectNext(7) + // substream.expectNext(10) + // substream.expectNext(13) + // substream.expectComplete() + } + + "work with empty input stream" in { + val publisher = FlowFrom(List.empty[Int]).groupBy(_ % 2).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]() + publisher.subscribe(subscriber) + + subscriber.expectCompletedOrSubscriptionFollowedByComplete() + } + + "abort on onError from upstream" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + val e = TE("test") + upstreamSubscription.sendError(e) + + subscriber.expectError(e) + } + + "abort on onError from upstream when substreams are running" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher() + val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val (_, substream) = subscriber.expectNext() + val substreamPuppet = StreamPuppet(substream.toPublisher()) + + substreamPuppet.request(1) + substreamPuppet.expectNext(1) + + val e = TE("test") + upstreamSubscription.sendError(e) + + substreamPuppet.expectError(e) + subscriber.expectError(e) + + } + } + +}