Introduces fold as a Flow transformation and generalizes Sink.fold to be Flow.fold + Sink.head

Conflicts:
	akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala
	akka-stream/src/main/scala/akka/stream/impl/Stages.scala
	akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
This commit is contained in:
Viktor Klang 2015-06-14 03:12:30 -04:00
parent 849875ad90
commit 36abbb4234
18 changed files with 204 additions and 209 deletions

View file

@ -18,6 +18,7 @@ import akka.stream.scaladsl._
import akka.stream._
import akka.stream.io._
import akka.stream.io.SslTls.TlsModule
import akka.stream.stage.Stage
import akka.util.ByteString
import org.reactivestreams._
@ -80,8 +81,8 @@ private[akka] case class ActorMaterializerImpl(
override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Any = {
def newMaterializationContext() = new MaterializationContext(ActorMaterializerImpl.this,
effectiveAttributes, stageName(effectiveAttributes))
def newMaterializationContext() =
new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
atomic match {
case sink: SinkModule[_, _]
val (sub, mat) = sink.create(newMaterializationContext())
@ -98,9 +99,10 @@ private[akka] case class ActorMaterializerImpl(
assignPort(stage.outPort, processor)
mat
case tls: TlsModule
case tls: TlsModule // TODO solve this so TlsModule doesn't need special treatment here
val es = effectiveSettings(effectiveAttributes)
val props = SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
val props =
SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
@ -114,7 +116,8 @@ private[akka] case class ActorMaterializerImpl(
assignPort(tls.plainIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn))
assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn))
case junction: JunctionModule materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
case junction: JunctionModule
materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
}
}
@ -125,10 +128,8 @@ private[akka] case class ActorMaterializerImpl(
case Identity(attr) (new VirtualProcessor, ())
case _
val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes)
val processor = ActorProcessorFactory[Any, Any](actorOf(
opprops,
stageName(effectiveAttributes),
effectiveSettings.dispatcher))
val processor = ActorProcessorFactory[Any, Any](
actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher))
processor -> mat
}
@ -183,18 +184,15 @@ private[akka] case class ActorMaterializerImpl(
}
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
val size = outs.size
def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
}
def factory(id: Int) =
new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) }
val publishers =
if (outs.size < 8) Vector.tabulate(size)(factory)
else List.tabulate(size)(factory)
impl ! FanOut.ExposedPublishers(publishers)
publishers.zip(outs).foreach { case (pub, out) assignPort(out, pub) }
val subscriber = ActorSubscriber[Any](impl)
assignPort(in, subscriber)
publishers.iterator.zip(outs.iterator).foreach { case (pub, out) assignPort(out, pub) }
assignPort(in, ActorSubscriber[Any](impl))
}
}
@ -257,7 +255,8 @@ private[akka] object StreamSupervisor {
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded
final case class Materialize(props: Props, name: String)
extends DeadLetterSuppression with NoSerializationVerificationNeeded
/** Testing purpose */
final case object GetChildren
@ -294,41 +293,41 @@ private[akka] object ActorProcessorFactory {
import akka.stream.impl.Stages._
import ActorMaterializerImpl._
private val _identity = (x: Any) x
def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = {
val att = parentAttributes and op.attributes
// USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
// Also, otherwise the attributes will not affect the settings properly!
val settings = materializer.effectiveSettings(att)
def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ())
op match {
case Identity(_) throw new AssertionError("Identity cannot end up in ActorProcessorFactory")
case Fused(ops, _) (ActorInterpreter.props(settings, ops, materializer, att), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer, att), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer, att), ())
case TakeWhile(p, _) (ActorInterpreter.props(settings, List(fusing.TakeWhile(p, settings.supervisionDecider)), materializer, att), ())
case DropWhile(p, _) (ActorInterpreter.props(settings, List(fusing.DropWhile(p, settings.supervisionDecider)), materializer, att), ())
case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer, att), ())
case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer, att), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer, att), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ())
case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ())
case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ())
case Log(n, e, l, _) (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ())
case Map(f, _) interp(fusing.Map(f, settings.supervisionDecider))
case Filter(p, _) interp(fusing.Filter(p, settings.supervisionDecider))
case Drop(n, _) interp(fusing.Drop(n))
case Take(n, _) interp(fusing.Take(n))
case TakeWhile(p, _) interp(fusing.TakeWhile(p, settings.supervisionDecider))
case DropWhile(p, _) interp(fusing.DropWhile(p, settings.supervisionDecider))
case Collect(pf, _) interp(fusing.Collect(pf, settings.supervisionDecider))
case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider))
case Fold(z, f, _) interp(fusing.Fold(z, f, settings.supervisionDecider))
case Expand(s, f, _) interp(fusing.Expand(s, f))
case Conflate(s, f, _) interp(fusing.Conflate(s, f, settings.supervisionDecider))
case Buffer(n, s, _) interp(fusing.Buffer(n, s))
case MapConcat(f, _) interp(fusing.MapConcat(f, settings.supervisionDecider))
case MapAsync(p, f, _) interp(fusing.MapAsync(p, f, settings.supervisionDecider))
case MapAsyncUnordered(p, f, _) interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider))
case Grouped(n, _) interp(fusing.Grouped(n))
case Log(n, e, l, _) interp(fusing.Log(n, e, l))
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ())
case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ())
case StageFactory(mkStage, _) interp(mkStage())
case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())
case MaterializingStageFactory(mkStageAndMat, _)
val sm = mkStageAndMat()
(ActorInterpreter.props(settings, List(sm._1), materializer, att), sm._2)
val s_m = mkStageAndMat()
(ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2)
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
}
}